You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/05/21 18:47:08 UTC
[2/4] accumulo git commit: ACCUMULO-3423 use zookeeper to track WAL
state
ACCUMULO-3423 use zookeeper to track WAL state
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0b487930
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0b487930
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0b487930
Branch: refs/heads/master
Commit: 0b487930b4096ab9ed76e19735b1cc1cc6512b9e
Parents: 61f9279
Author: Eric Newton <er...@gmail.com>
Authored: Thu May 21 11:44:00 2015 -0400
Committer: Eric Newton <er...@gmail.com>
Committed: Thu May 21 11:44:00 2015 -0400
----------------------------------------------------------------------
.../core/metadata/schema/MetadataSchema.java | 47 --
.../core/metadata/MetadataTableSchemaTest.java | 47 --
.../apache/accumulo/server/init/Initialize.java | 2 +
.../apache/accumulo/server/log/WalMarker.java | 217 +++++++++
.../server/master/state/MetaDataStateStore.java | 29 --
.../server/master/state/TServerInstance.java | 14 +-
.../server/master/state/TabletStateStore.java | 6 -
.../master/state/ZooTabletStateStore.java | 10 -
.../accumulo/server/util/ListVolumesUsed.java | 13 +-
.../accumulo/server/util/MetadataTableUtil.java | 138 ------
.../gc/GarbageCollectWriteAheadLogs.java | 439 ++++++-------------
.../CloseWriteAheadLogReferences.java | 58 +--
.../gc/GarbageCollectWriteAheadLogsTest.java | 259 ++++++-----
.../CloseWriteAheadLogReferencesTest.java | 132 ------
.../accumulo/master/TabletGroupWatcher.java | 14 +-
.../apache/accumulo/tserver/TabletServer.java | 38 +-
.../tserver/log/TabletServerLogger.java | 9 +-
.../org/apache/accumulo/test/UnusedWALIT.java | 17 +-
.../java/org/apache/accumulo/test/VolumeIT.java | 23 +-
.../accumulo/test/functional/WALSunnyDayIT.java | 28 +-
...bageCollectorCommunicatesWithTServersIT.java | 22 +-
.../test/replication/ReplicationIT.java | 44 +-
22 files changed, 630 insertions(+), 976 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index fe75f9e..8a98ba0 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -16,14 +16,11 @@
*/
package org.apache.accumulo.core.metadata.schema;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.schema.Section;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.hadoop.io.Text;
@@ -282,48 +279,4 @@ public class MetadataSchema {
}
}
- /**
- * Holds references to the WALs in use in a live Tablet Server.
- * <p>
- * <code>~wal+tserver:port[sessionId] log:hdfs://localhost:8020/accumulo/wal/tserver+port/WAL [] -></code>
- */
- public static class CurrentLogsSection {
- private static final Section section = new Section(RESERVED_PREFIX + "wal+", true, RESERVED_PREFIX + "wal,", false);
- private static byte LEFT_BRACKET = (byte) '[';
- public static final Text COLF = new Text("log");
- public static final Value UNUSED = new Value("unused".getBytes(UTF_8));
-
- public static Range getRange() {
- return section.getRange();
- }
-
- public static String getRowPrefix() {
- return section.getRowPrefix();
- }
-
- public static void getTabletServer(Key k, Text hostPort, Text session) {
- Preconditions.checkNotNull(k);
- Preconditions.checkNotNull(hostPort);
- Preconditions.checkNotNull(session);
-
- Text row = new Text();
- k.getRow(row);
- if (!row.toString().startsWith(section.getRowPrefix())) {
- throw new IllegalArgumentException("Bad key " + k.toString());
- }
- for (int sessionStart = section.getRowPrefix().length(); sessionStart < row.getLength() - 1; sessionStart++) {
- if (row.charAt(sessionStart) == LEFT_BRACKET) {
- hostPort.set(row.getBytes(), section.getRowPrefix().length(), sessionStart - section.getRowPrefix().length());
- session.set(row.getBytes(), sessionStart + 1, row.getLength() - sessionStart - 2);
- return;
- }
- }
- throw new IllegalArgumentException("Bad key " + k.toString());
- }
-
- public static void getPath(Key k, Text path) {
- k.getColumnQualifier(path);
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
deleted file mode 100644
index cfe59f2..0000000
--- a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.accumulo.core.metadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
-import org.apache.hadoop.io.Text;
-import org.junit.Test;
-
-public class MetadataTableSchemaTest {
-
- @Test
- public void testGetTabletServer() throws Exception {
- Key key = new Key("~wal+host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
- Text hostPort = new Text();
- Text session = new Text();
- CurrentLogsSection.getTabletServer(key, hostPort, session);
- assertEquals("host:43861", hostPort.toString());
- assertEquals("14a7df0e6420003", session.toString());
- try {
- Key bogus = new Key("~wal/host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
- CurrentLogsSection.getTabletServer(bogus, hostPort, session);
- fail("bad argument not thrown");
- } catch (IllegalArgumentException ex) {
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 9afb93f..4753d38 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -88,6 +88,7 @@ import org.apache.accumulo.server.constraints.MetadataConstraints;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
+import org.apache.accumulo.server.log.WalMarker;
import org.apache.accumulo.server.replication.ReplicationUtil;
import org.apache.accumulo.server.replication.StatusCombiner;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -549,6 +550,7 @@ public class Initialize implements KeywordExecutable {
zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_BASE, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_TSERVERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + WalMarker.ZWALS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
}
private String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java b/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
new file mode 100644
index 0000000..9cfd99f
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.log;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+
+/*
+ * WAL Markers. This class governs the space in Zookeeper that advertises the status of Write-Ahead Logs
+ * in use by tablet servers and the replication machinery.
+ *
+ * The Master needs to know the state of the WALs to mark tablets during recovery.
+ * The GC needs to know when a log is no longer needed so it can be removed.
+ * The replication mechanism needs to know when a log is closed and can be forwarded to the destination table.
+ *
+ * The state of the WALs is kept in Zookeeper under <root>/wals.
+ * For each server, there is a znode formatted like the TServerInstance.toString(): "host:port[sessionid]".
+ * Under the server znode, is a node for each log, using the UUID for the log.
+ * In each of the WAL znodes, is the current state of the log, and the full path to the log.
+ *
+ * The state [OPEN, CLOSED, UNREFERENCED] is what the tablet server believes to be the state of the file.
+ *
+ * In the event of a recovery, the log is identified as belonging to a dead server. The master will update
+ * the tablets assigned to that server with log references. Once all tablets have been reassigned and the log
+ * references are removed, the log will be eligible for deletion.
+ *
+ * Even when a log is UNREFERENCED by the tablet server, the replication mechanism may still need the log.
+ * The GC will defer log removal until replication is finished with it.
+ *
+ */
+public class WalMarker {
+
+ public class WalMarkerException extends Exception {
+ static private final long serialVersionUID = 1L;
+
+ public WalMarkerException(Exception ex) {
+ super(ex);
+ }
+ }
+
+ public final static String ZWALS = "/wals";
+
+ public static enum WalState {
+ /* log is open, and may be written to */
+ OPEN,
+ /* log is closed, and will not be written to again */
+ CLOSED,
+ /* unreferenced: no tablet needs the log for recovery */
+ UNREFERENCED
+ }
+
+ private final Instance instance;
+ private final ZooReaderWriter zoo;
+
+ public WalMarker(Instance instance, ZooReaderWriter zoo) {
+ this.instance = instance;
+ this.zoo = zoo;
+ }
+
+ private String root() {
+ return ZooUtil.getRoot(instance) + ZWALS;
+ }
+
+ // Tablet server exists
+ public void initWalMarker(TServerInstance tsi) throws WalMarkerException {
+ byte[] data = new byte[0];
+ try {
+ zoo.putPersistentData(root() + "/" + tsi.toString(), data, NodeExistsPolicy.FAIL);
+ } catch (KeeperException | InterruptedException e) {
+ throw new WalMarkerException(e);
+ }
+ }
+
+ // Tablet server opens a new WAL
+ public void addNewWalMarker(TServerInstance tsi, Path path) throws WalMarkerException {
+ updateState(tsi, path, WalState.OPEN);
+ }
+
+ private void updateState(TServerInstance tsi, Path path, WalState state) throws WalMarkerException {
+ byte[] data = (state.toString() + "," + path.toString()).getBytes(UTF_8);
+ try {
+ NodeExistsPolicy policy = NodeExistsPolicy.OVERWRITE;
+ if (state == WalState.OPEN) {
+ policy = NodeExistsPolicy.FAIL;
+ }
+ zoo.putPersistentData(root() + "/" + tsi.toString() + "/" + path.getName(), data, policy);
+ } catch (KeeperException | InterruptedException e) {
+ throw new WalMarkerException(e);
+ }
+ }
+
+ // Tablet server has no references to the WAL
+ public void walUnreferenced(TServerInstance tsi, Path path) throws WalMarkerException {
+ updateState(tsi, path, WalState.UNREFERENCED);
+ }
+
+ private static Pair<WalState,Path> parse(byte data[]) {
+ String parts[] = new String(data, UTF_8).split(",");
+ return new Pair<>(WalState.valueOf(parts[0]), new Path(parts[1]));
+ }
+
+ // Master needs to know the logs for the given instance
+ public List<Path> getWalsInUse(TServerInstance tsi) throws WalMarkerException {
+ List<Path> result = new ArrayList<>();
+ try {
+ String zpath = root() + "/" + tsi.toString();
+ zoo.sync(zpath);
+ for (String child : zoo.getChildren(zpath)) {
+ Pair<WalState,Path> parts = parse(zoo.getData(zpath + "/" + child, null));
+ if (parts.getFirst() != WalState.UNREFERENCED) {
+ result.add(parts.getSecond());
+ }
+ }
+ } catch (KeeperException | InterruptedException e) {
+ throw new WalMarkerException(e);
+ }
+ return result;
+ }
+
+ // garbage collector wants the list of logs markers for all servers
+ public Map<TServerInstance,List<UUID>> getAllMarkers() throws WalMarkerException {
+ Map<TServerInstance,List<UUID>> result = new HashMap<>();
+ try {
+ String path = root();
+ for (String child : zoo.getChildren(path)) {
+ List<UUID> logs = result.get(child);
+ if (logs == null) {
+ result.put(new TServerInstance(child), logs = new ArrayList<>());
+ }
+ for (String idString : zoo.getChildren(path + "/" + child)) {
+ logs.add(UUID.fromString(idString));
+ }
+ }
+ } catch (KeeperException | InterruptedException e) {
+ throw new WalMarkerException(e);
+ }
+ return result;
+ }
+
+ // garbage collector wants to know the state (open/closed) of a log, and the filename to delete
+ public Pair<WalState,Path> state(TServerInstance instance, UUID uuid) throws WalMarkerException {
+ try {
+ String path = root() + "/" + instance.toString() + "/" + uuid.toString();
+ return parse(zoo.getData(path, null));
+ } catch (KeeperException | InterruptedException e) {
+ throw new WalMarkerException(e);
+ }
+ }
+
+ // utility combination of getAllMarkers and state
+ public Map<Path,WalState> getAllState() throws WalMarkerException {
+ Map<Path,WalState> result = new HashMap<>();
+ for (Entry<TServerInstance,List<UUID>> entry : getAllMarkers().entrySet()) {
+ for (UUID id : entry.getValue()) {
+ Pair<WalState,Path> state = state(entry.getKey(), id);
+ result.put(state.getSecond(), state.getFirst());
+ }
+ }
+ return result;
+ }
+
+ // garbage collector knows it's safe to remove the marker for a closed log
+ public void removeWalMarker(TServerInstance instance, UUID uuid) throws WalMarkerException {
+ try {
+ String path = root() + "/" + instance.toString() + "/" + uuid.toString();
+ zoo.delete(path, -1);
+ } catch (InterruptedException | KeeperException e) {
+ throw new WalMarkerException(e);
+ }
+ }
+
+ // garbage collector knows the instance is dead, and has no markers
+ public void forget(TServerInstance instance) throws WalMarkerException {
+ String path = root() + "/" + instance.toString();
+ try {
+ zoo.recursiveDelete(path, NodeMissingPolicy.FAIL);
+ } catch (InterruptedException | KeeperException e) {
+ throw new WalMarkerException(e);
+ }
+ }
+
+ // tablet server can mark the log as closed (but still needed), for replication to begin
+ public void closeWal(TServerInstance instance, Path path) throws WalMarkerException {
+ updateState(instance, path, WalState.CLOSED);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 600349b..7763c25 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.server.master.state;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.BatchWriter;
@@ -33,11 +32,8 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
public class MetaDataStateStore extends TabletStateStore {
- private static final Logger log = Logger.getLogger(MetaDataStateStore.class);
private static final int THREADS = 4;
private static final int LATENCY = 1000;
@@ -163,29 +159,4 @@ public class MetaDataStateStore extends TabletStateStore {
return "Normal Tablets";
}
- @Override
- public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) throws DistributedStoreException {
- BatchWriter writer = createBatchWriter();
- try {
- for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) {
- if (entry.getValue().isEmpty()) {
- continue;
- }
- Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
- for (Path log : entry.getValue()) {
- m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED);
- }
- writer.addMutation(m);
- }
- } catch (Exception ex) {
- log.error("Error marking logs as unused: " + logs);
- throw new DistributedStoreException(ex);
- } finally {
- try {
- writer.close();
- } catch (MutationsRejectedException e) {
- throw new DistributedStoreException(e);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
index d2d4f44..ace9f05 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
@@ -42,8 +42,8 @@ public class TServerInstance implements Comparable<TServerInstance>, Serializabl
// HostAndPort is not Serializable
private transient HostAndPort location;
- private String session;
- private String cachedStringRepresentation;
+ private final String session;
+ private final String cachedStringRepresentation;
public TServerInstance(HostAndPort address, String session) {
this.location = address;
@@ -51,6 +51,16 @@ public class TServerInstance implements Comparable<TServerInstance>, Serializabl
this.cachedStringRepresentation = hostPort() + "[" + session + "]";
}
+ public TServerInstance(String formattedString) {
+ int pos = formattedString.indexOf("[");
+ if (pos < 0 || !formattedString.endsWith("]")) {
+ throw new IllegalArgumentException(formattedString);
+ }
+ this.location = HostAndPort.fromString(formattedString.substring(0, pos));
+ this.session = formattedString.substring(pos + 1, formattedString.length() - 1);
+ this.cachedStringRepresentation = hostPort() + "[" + session + "]";
+ }
+
public TServerInstance(HostAndPort address, long session) {
this(address, Long.toHexString(session));
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index 147e071..3ead237 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -88,10 +88,4 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
}
store.setLocations(Collections.singletonList(assignment));
}
-
- /**
- * When a server fails, its logs must be marked as unused after the log markers are moved to the tablets.
- */
- abstract public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) throws DistributedStoreException;
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index 03627e3..720046f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -27,9 +27,7 @@ import java.util.List;
import java.util.Map;
import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -183,8 +181,6 @@ public class ZooTabletStateStore extends TabletStateStore {
throw new DistributedStoreException(ex);
}
store.put(RootTable.ZROOT_TABLET_WALOGS + "/" + logEntry.getUniqueID(), value);
- store.remove(RootTable.ZROOT_TABLET_CURRENT_LOGS + "/" + MetadataSchema.CurrentLogsSection.getRowPrefix() + tls.current.toString()
- + logEntry.getUniqueID());
}
}
}
@@ -197,10 +193,4 @@ public class ZooTabletStateStore extends TabletStateStore {
public String name() {
return "Root Table";
}
-
- @Override
- public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) {
- // the root table is not replicated, so unassigning the root tablet has removed the current log marker
- }
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index 9e3fc7d..7cf3f37 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -34,8 +34,9 @@ import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
/**
*
@@ -123,15 +124,13 @@ public class ListVolumesUsed {
System.out.println("\tVolume : " + volume);
volumes.clear();
- scanner.clearColumns();
- scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
- Text path = new Text();
- for (Entry<Key,Value> entry : scanner) {
- MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+
+ WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+ for (Path path : wals.getAllState().keySet()) {
volumes.add(getLogURI(path.toString()));
}
- System.out.println("Listing volumes referenced in " + name + " current logs section");
+ System.out.println("Listing volumes referenced in " + name + " current logs");
for (String volume : volumes)
System.out.println("\tVolume : " + volume);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 5d5415d..45d2fef 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -59,7 +59,6 @@ import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
@@ -81,12 +80,10 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.TabletLevel;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -1055,139 +1052,4 @@ public class MetadataTableUtil {
return tabletEntries;
}
- public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename, TabletLevel level) {
- log.debug("Adding log entry " + filename);
- if (level == TabletLevel.ROOT) {
- LogEntry log = new LogEntry(RootTable.EXTENT, System.currentTimeMillis(), tabletSession.hostPort(), filename.toString());
- final byte[] node;
- try {
- node = log.toBytes();
- } catch (IOException e) {
- throw new RuntimeException("Failed to write to byte array", e);
- }
- retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
- @Override
- public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
- String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
- String uniqueId = filename.getName();
- StringBuilder path = new StringBuilder(root);
- path.append("/");
- path.append(CurrentLogsSection.getRowPrefix());
- path.append(tabletSession.toString());
- path.append(uniqueId);
- rw.putPersistentData(path.toString(), node, NodeExistsPolicy.OVERWRITE);
- }
- });
- } else {
- Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString());
- m.put(CurrentLogsSection.COLF, new Text(filename.toString()), new Value(EMPTY_BYTES));
- String tableName = MetadataTable.NAME;
- if (level == TabletLevel.META) {
- tableName = RootTable.NAME;
- }
- BatchWriter bw = null;
- try {
- bw = context.getConnector().createBatchWriter(tableName, null);
- bw.addMutation(m);
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- if (bw != null) {
- try {
- bw.close();
- } catch (Exception e2) {
- throw new RuntimeException(e2);
- }
- }
- }
- }
- }
-
- private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename) {
- retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
- @Override
- public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
- String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
- String uniqueId = filename.getName();
- String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
- log.debug("Removing entry " + path + " from zookeeper");
- rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
- }
- });
- }
-
- public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<Path> all) throws AccumuloException {
- // There could be a marker at the meta and/or root level, mark them both as unused
- try {
- BatchWriter root = null;
- BatchWriter meta = null;
- try {
- root = context.getConnector().createBatchWriter(RootTable.NAME, null);
- meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
- for (Path fname : all) {
- Text tname = new Text(fname.toString());
- Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
- m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
- root.addMutation(m);
- log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname);
- m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
- m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
- meta.addMutation(m);
- removeCurrentRootLogMarker(context, lock, tabletSession, fname);
- }
- } finally {
- if (root != null) {
- root.close();
- }
- if (meta != null) {
- meta.close();
- }
- }
- } catch (Exception ex) {
- throw new AccumuloException(ex);
- }
- }
-
- public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server,
- Map<TServerInstance,List<Path>> logsForDeadServers) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
- // already cached
- if (logsForDeadServers.containsKey(server)) {
- return;
- }
- if (extent.isRootTablet()) {
- final List<Path> logs = new ArrayList<>();
- retryZooKeeperUpdate(context, lock, new ZooOperation() {
- @Override
- public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
- String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
- logs.clear();
- for (String child : rw.getChildren(root)) {
- byte[] data = rw.getData(root + "/" + child, null);
- LogEntry entry = LogEntry.fromBytes(data);
- logs.add(new Path(entry.filename));
- }
- }
- });
- logsForDeadServers.put(server, logs);
- } else {
- // use the correct meta table
- String table = MetadataTable.NAME;
- if (extent.isMeta()) {
- table = RootTable.NAME;
- }
- // fetch the current logs in use, and put them in the cache
- Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY);
- scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
- List<Path> logs = new ArrayList<>();
- Text path = new Text();
- for (Entry<Key,Value> entry : scanner) {
- MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
- if (!entry.getValue().equals(MetadataSchema.CurrentLogsSection.UNUSED)) {
- logs.add(new Path(path.toString()));
- }
- }
- logsForDeadServers.put(server, logs);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 194d357..414d29e 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -30,36 +30,24 @@ import java.util.UUID;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
-import org.apache.accumulo.core.volume.Volume;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
+import org.apache.accumulo.server.log.WalMarker.WalState;
import org.apache.accumulo.server.master.LiveTServerSet;
import org.apache.accumulo.server.master.LiveTServerSet.Listener;
import org.apache.accumulo.server.master.state.MetaDataStateStore;
@@ -67,34 +55,25 @@ import org.apache.accumulo.server.master.state.RootTabletStateStore;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
import org.apache.accumulo.server.master.state.TabletState;
-import org.apache.accumulo.server.replication.StatusUtil;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
-import com.google.common.net.HostAndPort;
-import com.google.protobuf.InvalidProtocolBufferException;
public class GarbageCollectWriteAheadLogs {
private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
- // The order of these is _very_ important. Must read from current_logs, then walogs because ZooTabletStateStore writes to
- // walogs and then removes from current_logs
- private static final String[] ZK_LOG_SUFFIXES = new String[] {RootTable.ZROOT_TABLET_CURRENT_LOGS, RootTable.ZROOT_TABLET_WALOGS};
-
private final AccumuloServerContext context;
private final VolumeManager fs;
private final boolean useTrash;
private final LiveTServerSet liveServers;
+ private final WalMarker walMarker;
+ private final Iterable<TabletLocationState> store;
/**
* Creates a new GC WAL object.
@@ -106,7 +85,7 @@ public class GarbageCollectWriteAheadLogs {
* @param useTrash
* true to move files to trash rather than delete them
*/
- GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash) throws IOException {
+ GarbageCollectWriteAheadLogs(final AccumuloServerContext context, VolumeManager fs, boolean useTrash) throws IOException {
this.context = context;
this.fs = fs;
this.useTrash = useTrash;
@@ -118,6 +97,13 @@ public class GarbageCollectWriteAheadLogs {
}
});
liveServers.startListeningForTabletServerChanges();
+ this.walMarker = new WalMarker(context.getInstance(), ZooReaderWriter.getInstance());
+ this.store = new Iterable<TabletLocationState>() {
+ @Override
+ public Iterator<TabletLocationState> iterator() {
+ return Iterators.concat(new RootTabletStateStore(context).iterator(), new MetaDataStateStore(context).iterator());
+ }
+ };
}
/**
@@ -133,32 +119,45 @@ public class GarbageCollectWriteAheadLogs {
* a started LiveTServerSet instance
*/
@VisibleForTesting
- GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, LiveTServerSet liveTServerSet) throws IOException {
+ GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, LiveTServerSet liveTServerSet, WalMarker walMarker,
+ Iterable<TabletLocationState> store) throws IOException {
this.context = context;
this.fs = fs;
this.useTrash = useTrash;
this.liveServers = liveTServerSet;
+ this.walMarker = walMarker;
+ this.store = store;
}
public void collect(GCStatus status) {
Span span = Trace.start("getCandidates");
try {
- Set<TServerInstance> currentServers = liveServers.getCurrentServers();
-
status.currentLog.started = System.currentTimeMillis();
- Map<TServerInstance,Set<Path>> candidates = new HashMap<>();
- long count = getCurrent(candidates, currentServers);
+ Map<TServerInstance,Set<UUID>> logsByServer = new HashMap<>();
+ Map<UUID,Pair<WalState,Path>> logsState = new HashMap<>();
+ // Scan for log file info first: the order is important
+ // Consider:
+ // * get live servers
+ // * new server gets a lock, creates a log
+ // * get logs
+ // * the log appears to belong to a dead server
+ long count = getCurrent(logsByServer, logsState);
long fileScanStop = System.currentTimeMillis();
- log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, candidates.size(), (fileScanStop - status.currentLog.started) / 1000.));
+ log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, logsByServer.size(), (fileScanStop - status.currentLog.started) / 1000.));
status.currentLog.candidates = count;
span.stop();
+ // now it's safe to get the liveServers
+ Set<TServerInstance> currentServers = liveServers.getCurrentServers();
+
+ Map<UUID,TServerInstance> uuidToTServer;
span = Trace.start("removeEntriesInUse");
try {
- count = removeEntriesInUse(candidates, status, currentServers);
+ uuidToTServer = removeEntriesInUse(logsByServer, currentServers, logsState);
+ count = uuidToTServer.size();
} catch (Exception ex) {
log.error("Unable to scan metadata table", ex);
return;
@@ -171,7 +170,7 @@ public class GarbageCollectWriteAheadLogs {
span = Trace.start("removeReplicationEntries");
try {
- count = removeReplicationEntries(candidates, status);
+ count = removeReplicationEntries(uuidToTServer);
} catch (Exception ex) {
log.error("Unable to scan replication table", ex);
return;
@@ -184,14 +183,15 @@ public class GarbageCollectWriteAheadLogs {
span = Trace.start("removeFiles");
- count = removeFiles(candidates, status);
+ logsState.keySet().retainAll(uuidToTServer.keySet());
+ count = removeFiles(logsState.values(), status);
long removeStop = System.currentTimeMillis();
- log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, candidates.size(), (removeStop - logEntryScanStop) / 1000.));
+ log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, logsByServer.size(), (removeStop - logEntryScanStop) / 1000.));
span.stop();
span = Trace.start("removeMarkers");
- count = removeTabletServerMarkers(candidates);
+ count = removeTabletServerMarkers(uuidToTServer, logsByServer, currentServers);
long removeMarkersStop = System.currentTimeMillis();
log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.));
span.stop();
@@ -207,50 +207,43 @@ public class GarbageCollectWriteAheadLogs {
}
}
- private long removeTabletServerMarkers(Map<TServerInstance,Set<Path>> candidates) {
+ private long removeTabletServerMarkers(Map<UUID,TServerInstance> uidMap, Map<TServerInstance,Set<UUID>> candidates, Set<TServerInstance> liveServers) {
long result = 0;
+ // remove markers for files removed
try {
- BatchWriter root = null;
- BatchWriter meta = null;
- try {
- root = context.getConnector().createBatchWriter(RootTable.NAME, null);
- meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
- for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
- Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
- for (Path path : entry.getValue()) {
- m.putDelete(CurrentLogsSection.COLF, new Text(path.toString()));
- result++;
- }
- root.addMutation(m);
- meta.addMutation(m);
- }
- } finally {
- if (meta != null) {
- meta.close();
- }
- if (root != null) {
- root.close();
- }
+ for (Entry<UUID,TServerInstance> entry : uidMap.entrySet()) {
+ walMarker.removeWalMarker(entry.getValue(), entry.getKey());
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
+ // remove parent znode for dead tablet servers
+ for (Entry<TServerInstance,Set<UUID>> entry : candidates.entrySet()) {
+ if (!liveServers.contains(entry.getKey())) {
+ log.info("Removing znode for " + entry.getKey());
+ try {
+ walMarker.forget(entry.getKey());
+ } catch (WalMarkerException ex) {
+ log.info("Error removing znode for " + entry.getKey() + " " + ex.toString());
+ }
+ }
+ }
return result;
}
- private long removeFiles(Map<TServerInstance,Set<Path>> candidates, final GCStatus status) {
- for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
- for (Path path : entry.getValue()) {
- log.debug("Removing unused WAL for server " + entry.getKey() + " log " + path);
- try {
- if (!useTrash || !fs.moveToTrash(path))
- fs.deleteRecursively(path);
- status.currentLog.deleted++;
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ex) {
- log.error("Unable to delete wal " + path + ": " + ex);
+ private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) {
+ for (Pair<WalState,Path> stateFile : collection) {
+ Path path = stateFile.getSecond();
+ log.debug("Removing " + stateFile.getFirst() + " WAL " + path);
+ try {
+ if (!useTrash || !fs.moveToTrash(path)) {
+ fs.deleteRecursively(path);
}
+ status.currentLog.deleted++;
+ } catch (FileNotFoundException ex) {
+ // ignored
+ } catch (IOException ex) {
+ log.error("Unable to delete wal " + path + ": " + ex);
}
}
return status.currentLog.deleted;
@@ -260,269 +253,107 @@ public class GarbageCollectWriteAheadLogs {
return UUID.fromString(path.getName());
}
- private long removeEntriesInUse(Map<TServerInstance,Set<Path>> candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException,
- KeeperException, InterruptedException {
-
- // remove any entries if there's a log reference, or a tablet is still assigned to the dead server
+ private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUID>> candidates, Set<TServerInstance> liveServers,
+ Map<UUID,Pair<WalState,Path>> logsState) throws IOException, KeeperException, InterruptedException {
- Map<UUID,TServerInstance> walToDeadServer = new HashMap<>();
- for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
- for (Path file : entry.getValue()) {
- walToDeadServer.put(path2uuid(file), entry.getKey());
+ Map<UUID,TServerInstance> result = new HashMap<>();
+ for (Entry<TServerInstance,Set<UUID>> entry : candidates.entrySet()) {
+ for (UUID id : entry.getValue()) {
+ result.put(id, entry.getKey());
}
}
- long count = 0;
- RootTabletStateStore root = new RootTabletStateStore(context);
- MetaDataStateStore meta = new MetaDataStateStore(context);
- Iterator<TabletLocationState> states = Iterators.concat(root.iterator(), meta.iterator());
+
+ // remove any entries if there's a log reference (recovery hasn't finished)
+ Iterator<TabletLocationState> states = store.iterator();
while (states.hasNext()) {
- count++;
TabletLocationState state = states.next();
+
+ // Tablet is still assigned to a dead server. Master has moved markers and reassigned it
+ // Easiest to just ignore all the WALs for the dead server.
if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
- candidates.remove(state.current);
+ Set<UUID> idsToIgnore = candidates.remove(state.current);
+ if (idsToIgnore != null) {
+ for (UUID id : idsToIgnore) {
+ result.remove(id);
+ }
+ }
}
+ // Tablet is being recovered and has WAL references, remove all the WALs for the dead server
+ // that made the WALs.
for (Collection<String> wals : state.walogs) {
for (String wal : wals) {
UUID walUUID = path2uuid(new Path(wal));
- TServerInstance dead = walToDeadServer.get(walUUID);
- if (dead != null) {
- Iterator<Path> iter = candidates.get(dead).iterator();
- while (iter.hasNext()) {
- if (path2uuid(iter.next()).equals(walUUID)) {
- iter.remove();
- break;
- }
+ TServerInstance dead = result.get(walUUID);
+ // There's a reference to a log file, so skip that server's logs
+ Set<UUID> idsToIgnore = candidates.remove(dead);
+ if (idsToIgnore != null) {
+ for (UUID id : idsToIgnore) {
+ result.remove(id);
}
}
}
}
}
- return count;
- }
-
- protected int removeReplicationEntries(Map<TServerInstance,Set<Path>> candidates, GCStatus status) throws IOException, KeeperException, InterruptedException {
- Connector conn;
- try {
- conn = context.getConnector();
- } catch (AccumuloException | AccumuloSecurityException e) {
- log.error("Failed to get connector", e);
- throw new IllegalArgumentException(e);
- }
-
- int count = 0;
-
- Iterator<Entry<TServerInstance,Set<Path>>> walIter = candidates.entrySet().iterator();
-
- while (walIter.hasNext()) {
- Entry<TServerInstance,Set<Path>> wal = walIter.next();
- Iterator<Path> paths = wal.getValue().iterator();
- while (paths.hasNext()) {
- Path fullPath = paths.next();
- if (neededByReplication(conn, fullPath)) {
- log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
- // If we haven't already removed it, check to see if this WAL is
- // "in use" by replication (needed for replication purposes)
- status.currentLog.inUse++;
- paths.remove();
- } else {
- log.debug("WAL not needed for replication {}", fullPath);
- }
- }
- if (wal.getValue().isEmpty()) {
- walIter.remove();
- }
- count++;
- }
- return count;
- }
-
- /**
- * Determine if the given WAL is needed for replication
- *
- * @param wal
- * The full path (URI)
- * @return True if the WAL is still needed by replication (not a candidate for deletion)
- */
- protected boolean neededByReplication(Connector conn, Path wal) {
- log.info("Checking replication table for " + wal);
-
- Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
-
- // TODO Push down this filter to the tserver to only return records
- // that are not completely replicated and convert this loop into a
- // `return s.iterator.hasNext()` statement
- for (Entry<Key,Value> entry : iter) {
- try {
- Status status = Status.parseFrom(entry.getValue().get());
- log.info("Checking if {} is safe for removal with {}", wal, ProtobufUtil.toString(status));
- if (!StatusUtil.isSafeForRemoval(status)) {
- return true;
+ // Remove OPEN and CLOSED logs for live servers: they are still in use
+ for (TServerInstance liveServer : liveServers) {
+ Set<UUID> idsForServer = candidates.get(liveServer);
+ // Server may not have any logs yet
+ if (idsForServer != null) {
+ for (UUID id : idsForServer) {
+ Pair<WalState,Path> stateFile = logsState.get(id);
+ if (stateFile.getFirst() != WalState.UNREFERENCED) {
+ result.remove(id);
+ }
}
- } catch (InvalidProtocolBufferException e) {
- log.error("Could not deserialize Status protobuf for " + entry.getKey(), e);
}
}
-
- return false;
+ return result;
}
- protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, Path wal) {
- Scanner metaScanner;
- try {
- metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- } catch (TableNotFoundException e) {
- throw new RuntimeException(e);
- }
-
- // Need to add in the replication section prefix
- metaScanner.setRange(Range.exact(ReplicationSection.getRowPrefix() + wal));
- // Limit the column family to be sure
- metaScanner.fetchColumnFamily(ReplicationSection.COLF);
-
+ protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates) throws IOException, KeeperException, InterruptedException {
+ Connector conn;
try {
- Scanner replScanner = ReplicationTable.getScanner(conn);
-
- // Scan only the Status records
- StatusSection.limit(replScanner);
-
- // Only look for this specific WAL
- replScanner.setRange(Range.exact(wal.toString()));
+ conn = context.getConnector();
+ final Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+ scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+ for (Entry<Key,Value> entry : scanner) {
+ Text file = new Text();
+ MetadataSchema.ReplicationSection.getFile(entry.getKey(), file);
+ UUID id = path2uuid(new Path(file.toString()));
+ candidates.remove(id);
+ log.info("Ignore closed log " + id + " because it is being replicated");
+ }
- return Iterables.concat(metaScanner, replScanner);
- } catch (ReplicationTableOfflineException e) {
- // do nothing
+ return candidates.size();
+ } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+ log.error("Failed to scan metadata table", e);
+ throw new IllegalArgumentException(e);
}
-
- return metaScanner;
}
/**
- * Scans log markers. The map passed in is populated with the logs for dead servers.
+ * Scans log markers. The map passed in is populated with the log ids.
*
- * @param unusedLogs
+ * @param logsByServer
* map of dead server to log file entries
* @return total number of log files
*/
- private long getCurrent(Map<TServerInstance,Set<Path>> unusedLogs, Set<TServerInstance> currentServers) throws Exception {
- // Logs for the Root table are stored in ZooKeeper.
- Set<Path> rootWALs = getRootLogs(ZooReaderWriter.getInstance(), context.getInstance());
-
- long count = 0;
-
- // get all the WAL markers that are not in zookeeper for dead servers
- Scanner rootScanner = context.getConnector().createScanner(RootTable.NAME, Authorizations.EMPTY);
- rootScanner.setRange(CurrentLogsSection.getRange());
- Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- metaScanner.setRange(CurrentLogsSection.getRange());
- Iterator<Entry<Key,Value>> entries = Iterators.concat(rootScanner.iterator(), metaScanner.iterator());
- Text hostAndPort = new Text();
- Text sessionId = new Text();
- Text filename = new Text();
- while (entries.hasNext()) {
- Entry<Key,Value> entry = entries.next();
-
- CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId);
- CurrentLogsSection.getPath(entry.getKey(), filename);
- TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
- Path path = new Path(filename.toString());
-
- // A log is unused iff it's for a tserver which we don't know about or the log is marked as unused and it's not listed as used by the Root table
- if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED) && !rootWALs.contains(path)) {
- Set<Path> logs = unusedLogs.get(tsi);
- if (logs == null) {
- unusedLogs.put(tsi, logs = new HashSet<Path>());
- }
- if (logs.add(path)) {
- count++;
- }
- }
- }
-
- // scan HDFS for logs for dead servers
- for (Volume volume : VolumeManagerImpl.get().getVolumes()) {
- addUnusedWalsFromVolume(volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true), unusedLogs, context.getConnector()
- .getInstance().getZooKeepersSessionTimeOut());
- }
- return count;
- }
-
- /**
- * Fetch the WALs which are, or were, referenced by the Root Table
- *
- * @return The Set of WALs which are needed by the Root Table
- */
- Set<Path> getRootLogs(final ZooReader zoo, Instance instance) throws Exception {
- final HashSet<Path> rootWALs = new HashSet<>();
- final String zkRoot = ZooUtil.getRoot(instance);
-
- // Get entries in zookeeper -- order in ZK_LOG_SUFFIXES is _very_ important
- for (String pathSuffix : ZK_LOG_SUFFIXES) {
- addLogsForNode(zoo, zkRoot + pathSuffix, rootWALs);
- }
-
- return rootWALs;
- }
-
- /**
- * Read all WALs from the given path in ZooKeeper and add the paths to each WAL to the provided <code>rootWALs</code>
- *
- * @param reader
- * A reader to ZooKeeper
- * @param zpath
- * The base path to read in ZooKeeper
- * @param rootWALs
- * A Set to collect the WALs in
- */
- void addLogsForNode(ZooReader reader, String zpath, HashSet<Path> rootWALs) throws Exception {
- // Get entries in zookeeper:
- List<String> children = reader.getChildren(zpath);
- for (String child : children) {
- LogEntry entry = LogEntry.fromBytes(reader.getData(zpath + "/" + child, null));
- rootWALs.add(new Path(entry.filename));
- }
- }
+ private long getCurrent(Map<TServerInstance,Set<UUID>> logsByServer, Map<UUID,Pair<WalState,Path>> logState) throws Exception {
- /**
- * Given a traversal over the <code>wals</code> directory on a {@link Volume}, add all unused WALs
- *
- * @param iter
- * Iterator over files in the <code>wals</code> directory
- * @param unusedLogs
- * Map tracking unused WALs by server
- */
- void addUnusedWalsFromVolume(RemoteIterator<LocatedFileStatus> iter, Map<TServerInstance,Set<Path>> unusedLogs, int zkTimeout) throws Exception {
- while (iter.hasNext()) {
- LocatedFileStatus next = iter.next();
- // recursive listing returns directories, too
- if (next.isDirectory()) {
- continue;
- }
- // make sure we've waited long enough for zookeeper propagation
- //
- // We aren't guaranteed to see the updates through the live tserver set just because the time since the file was
- // last modified is longer than the ZK timeout.
- // 1. If we think the server is alive, but it's actually dead, we'll grab it on a later cycle. Which is OK.
- // 2. If we think the server is dead but it happened to be restarted it's possible to have a server which would differ only by session.
- // This is also OK because the new TServer will create a new WAL.
- if (System.currentTimeMillis() - next.getModificationTime() < zkTimeout) {
- continue;
- }
- Path path = next.getPath();
- String hostPlusPort = path.getParent().getName();
- // server is still alive, or has a replacement (same host+port, different session)
- TServerInstance instance = liveServers.find(hostPlusPort);
- if (instance != null) {
- continue;
- }
- TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
- Set<Path> paths = unusedLogs.get(fake);
- if (paths == null) {
- paths = new HashSet<>();
+ // get all the unused WALs in zookeeper
+ long result = 0;
+ Map<TServerInstance,List<UUID>> markers = walMarker.getAllMarkers();
+ for (Entry<TServerInstance,List<UUID>> entry : markers.entrySet()) {
+ HashSet<UUID> ids = new HashSet<>(entry.getValue().size());
+ for (UUID id : entry.getValue()) {
+ ids.add(id);
+ logState.put(id, walMarker.state(entry.getKey(), id));
+ result++;
}
- paths.add(path);
- unusedLogs.put(fake, paths);
+ logsByServer.put(entry.getKey(), ids);
}
+ return result;
}
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index cb4b341..03d2e67 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -17,11 +17,11 @@
package org.apache.accumulo.gc.replication;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
@@ -37,7 +37,6 @@ import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -48,8 +47,12 @@ import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
+import org.apache.accumulo.server.log.WalMarker.WalState;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
@@ -58,9 +61,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Stopwatch;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import com.google.common.net.HostAndPort;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -158,58 +158,30 @@ public class CloseWriteAheadLogReferences implements Runnable {
log.info("Closed " + recordsClosed + " WAL replication references in replication table in " + sw.toString());
}
+ static final EnumSet<WalState> NOT_OPEN = EnumSet.complementOf(EnumSet.of(WalState.OPEN));
+
/**
- * Construct the set of referenced WALs from the metadata table
+ * Construct the set of referenced WALs from zookeeper
*
* @param conn
* Connector
* @return The Set of WALs that are referenced in the metadata table
*/
protected HashSet<String> getReferencedWals(Connector conn) {
- // Make a bounded cache to alleviate repeatedly creating the same Path object
- final LoadingCache<String,String> normalizedWalPaths = CacheBuilder.newBuilder().maximumSize(1024).concurrencyLevel(1)
- .build(new CacheLoader<String,String>() {
-
- @Override
- public String load(String key) {
- return new Path(key).toString();
- }
-
- });
+ WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
HashSet<String> referencedWals = new HashSet<>();
- BatchScanner bs = null;
try {
- // TODO Configurable number of threads
- bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
- bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
- bs.fetchColumnFamily(CurrentLogsSection.COLF);
-
- // For each log key/value in the metadata table
- for (Entry<Key,Value> entry : bs) {
- if (entry.getValue().equals(CurrentLogsSection.UNUSED)) {
- continue;
+ for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+ if (NOT_OPEN.contains(entry.getValue())) {
+ Path path = entry.getKey();
+ log.debug("Found WAL " + path.toString());
+ referencedWals.add(path.toString());
}
- Text tpath = new Text();
- CurrentLogsSection.getPath(entry.getKey(), tpath);
- String path = new Path(tpath.toString()).toString();
- log.debug("Found WAL " + path.toString());
-
- // Normalize each log file (using Path) and add it to the set
- referencedWals.add(normalizedWalPaths.get(path));
}
- } catch (TableNotFoundException e) {
- // uhhhh
+ } catch (WalMarkerException e) {
throw new RuntimeException(e);
- } catch (ExecutionException e) {
- log.error("Failed to normalize WAL file path", e);
- throw new RuntimeException(e);
- } finally {
- if (null != bs) {
- bs.close();
- }
}
-
return referencedWals;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index f431159..9cb32c8 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -16,153 +16,200 @@
*/
package org.apache.accumulo.gc;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
import java.util.UUID;
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.gc.thrift.GCStatus;
+import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalState;
import org.apache.accumulo.server.master.LiveTServerSet;
import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.accumulo.server.master.state.TabletLocationState;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
import org.easymock.EasyMock;
import org.junit.Test;
public class GarbageCollectWriteAheadLogsTest {
- private Map<TServerInstance,Set<Path>> runTest(LiveTServerSet tserverSet, String tserver, long modificationTime) throws Exception {
- // Mocks
+ private final TServerInstance server1 = new TServerInstance("localhost:1234[SESSION]");
+ private final TServerInstance server2 = new TServerInstance("localhost:1234[OTHERSESS]");
+ private final UUID id = UUID.randomUUID();
+ private final Map<TServerInstance,List<UUID>> markers = Collections.singletonMap(server1, Collections.singletonList(id));
+ private final Map<TServerInstance,List<UUID>> markers2 = Collections.singletonMap(server2, Collections.singletonList(id));
+ private final Path path = new Path("hdfs://localhost:9000/accumulo/wal/localhost+1234/" + id);
+ private final KeyExtent extent = new KeyExtent(new Text("1<"), new Text(new byte[] {0}));
+ private final Collection<Collection<String>> walogs = Collections.emptyList();
+ private final TabletLocationState tabletAssignedToServer1;
+ private final TabletLocationState tabletAssignedToServer2;
+ {
+ try {
+ tabletAssignedToServer1 = new TabletLocationState(extent, (TServerInstance) null, server1, (TServerInstance) null, walogs, false);
+ tabletAssignedToServer2 = new TabletLocationState(extent, (TServerInstance) null, server2, (TServerInstance) null, walogs, false);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ private final Iterable<TabletLocationState> tabletOnServer1List = Collections.singletonList(tabletAssignedToServer1);
+ private final Iterable<TabletLocationState> tabletOnServer2List = Collections.singletonList(tabletAssignedToServer2);
+ private final List<Entry<Key,Value>> emptyList = Collections.emptyList();
+ private final Iterator<Entry<Key,Value>> emptyKV = emptyList.iterator();
+
+ @Test
+ public void testRemoveUnusedLog() throws Exception {
AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
- final LocatedFileStatus fileStatus = EasyMock.createMock(LocatedFileStatus.class);
-
- // Concrete objs
- GarbageCollectWriteAheadLogs gcWals = new GarbageCollectWriteAheadLogs(context, fs, true, tserverSet);
+ WalMarker marker = EasyMock.createMock(WalMarker.class);
+ LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
- RemoteIterator<LocatedFileStatus> iter = new RemoteIterator<LocatedFileStatus>() {
- boolean returnedOnce = false;
+ GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
- @Override
- public boolean hasNext() throws IOException {
- return !returnedOnce;
- }
+ EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+ EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
+ EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<WalState,Path>(WalState.UNREFERENCED, path));
+ EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
+ marker.removeWalMarker(server1, id);
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(context, fs, marker, tserverSet);
+ GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer1List) {
@Override
- public LocatedFileStatus next() throws IOException {
- returnedOnce = true;
- return fileStatus;
+ protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates) throws IOException, KeeperException, InterruptedException {
+ return 0;
}
};
-
- Map<TServerInstance,Set<Path>> unusedLogs = new HashMap<>();
-
- // Path is /accumulo/wals/host+port/UUID
- Path walPath = new Path("/accumulo/wals/" + tserver + "/" + UUID.randomUUID().toString());
- EasyMock.expect(fileStatus.getPath()).andReturn(walPath).anyTimes();
-
- EasyMock.expect(fileStatus.getModificationTime()).andReturn(modificationTime);
- EasyMock.expect(fileStatus.isDirectory()).andReturn(false);
-
- EasyMock.replay(context, fs, fileStatus, tserverSet);
-
- gcWals.addUnusedWalsFromVolume(iter, unusedLogs, 0);
-
- EasyMock.verify(context, fs, fileStatus, tserverSet);
-
- return unusedLogs;
+ gc.collect(status);
+ EasyMock.verify(context, fs, marker, tserverSet);
}
@Test
- public void testUnnoticedServerFailure() throws Exception {
+ public void testKeepClosedLog() throws Exception {
+ AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
+ VolumeManager fs = EasyMock.createMock(VolumeManager.class);
+ WalMarker marker = EasyMock.createMock(WalMarker.class);
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
- TServerInstance tserverInstance = EasyMock.createMock(TServerInstance.class);
- String tserver = "host1+9997";
-
- // We find the TServer
- EasyMock.expect(tserverSet.find(tserver)).andReturn(tserverInstance);
- // But the modificationTime for the WAL was _way_ in the past.
- long modificationTime = 0l;
+ GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
- // If the modification time for a WAL was significantly in the past (so far that the server _should_ have died
- // by now) but the GC hasn't observed via ZK Watcher that the server died, we would not treat the
- // WAL as unused.
- Map<TServerInstance,Set<Path>> unusedLogs = runTest(tserverSet, tserver, modificationTime);
-
- // We think the server is still alive, therefore we don't call the WAL unused.
- assertEquals(0, unusedLogs.size());
+ EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+ EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
+ EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<WalState,Path>(WalState.CLOSED, path));
+ EasyMock.replay(context, marker, tserverSet, fs);
+ GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer1List) {
+ @Override
+ protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates) throws IOException, KeeperException, InterruptedException {
+ return 0;
+ }
+ };
+ gc.collect(status);
+ EasyMock.verify(context, marker, tserverSet, fs);
}
@Test
- public void testUnnoticedServerRestart() throws Exception {
+ public void deleteUnreferenceLogOnDeadServer() throws Exception {
+ AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
+ VolumeManager fs = EasyMock.createMock(VolumeManager.class);
+ WalMarker marker = EasyMock.createMock(WalMarker.class);
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
- String tserver = "host1+9997";
-
- // The server was _once_ alive, but we saw it died.
- // Before the LiveTServerSet gets the Watcher that it's back online (with a new session)
- // the GC runs
- EasyMock.expect(tserverSet.find(tserver)).andReturn(null);
-
- // Modification time for the WAL was _way_ in the past.
- long modificationTime = 0l;
-
- Map<TServerInstance,Set<Path>> unusedLogs = runTest(tserverSet, tserver, modificationTime);
-
- // If the same server comes back, it will use a new WAL, not the old one. The log should be unused
- assertEquals(1, unusedLogs.size());
+ Connector conn = EasyMock.createMock(Connector.class);
+ Scanner scanner = EasyMock.createMock(Scanner.class);
+
+ GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+ EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+ EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
+ EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
+ EasyMock.expect(context.getConnector()).andReturn(conn);
+ EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
+ scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+ EasyMock.expectLastCall().once();
+ scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(scanner.iterator()).andReturn(emptyKV);
+ EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
+ marker.removeWalMarker(server2, id);
+ EasyMock.expectLastCall().once();
+ marker.forget(server2);
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+ GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer1List);
+ gc.collect(status);
+ EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
}
@Test
- public void testAllRootLogsInZk() throws Exception {
- // Mocks
+ public void ignoreReferenceLogOnDeadServer() throws Exception {
AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
+ WalMarker marker = EasyMock.createMock(WalMarker.class);
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
- Instance instance = EasyMock.createMock(Instance.class);
- ZooReader zoo = EasyMock.createMock(ZooReader.class);
-
- // Fake out some WAL references
- final String instanceId = UUID.randomUUID().toString();
- final List<String> currentLogs = Arrays.asList("2");
- final List<String> walogs = Arrays.asList("1");
- LogEntry currentLogEntry = new LogEntry(new KeyExtent(new Text("+r"), null, null), 2, "host1:9997", "/accumulo/wals/host1+9997/2");
- LogEntry prevLogEntry = new LogEntry(new KeyExtent(new Text("+r"), null, null), 1, "host1:9997", "/accumulo/wals/host1+9997/1");
-
- GarbageCollectWriteAheadLogs gcWals = new GarbageCollectWriteAheadLogs(context, fs, true, tserverSet);
-
- // Define the expectations
- EasyMock.expect(instance.getInstanceID()).andReturn(instanceId).anyTimes();
- EasyMock.expect(zoo.getChildren(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_CURRENT_LOGS)).andReturn(currentLogs);
- EasyMock.expect(zoo.getChildren(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_WALOGS)).andReturn(walogs);
-
- EasyMock.expect(zoo.getData(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_CURRENT_LOGS + "/2", null)).andReturn(currentLogEntry.toBytes());
- EasyMock.expect(zoo.getData(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_WALOGS + "/1", null)).andReturn(prevLogEntry.toBytes());
-
- EasyMock.replay(instance, zoo);
-
- // Ensure that we see logs from both current_logs and walogs
- Set<Path> rootWals = gcWals.getRootLogs(zoo, instance);
-
- EasyMock.verify(instance, zoo);
+ Connector conn = EasyMock.createMock(Connector.class);
+ Scanner scanner = EasyMock.createMock(Scanner.class);
+
+ GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+ EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+ EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
+ EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
+ EasyMock.expect(context.getConnector()).andReturn(conn);
+ EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
+ scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+ EasyMock.expectLastCall().once();
+ scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(scanner.iterator()).andReturn(emptyKV);
+ EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+ GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer2List);
+ gc.collect(status);
+ EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
+ }
- assertEquals(2, rootWals.size());
- assertTrue("Expected to find WAL from walogs", rootWals.remove(new Path("/accumulo/wals/host1+9997/1")));
- assertTrue("Expected to find WAL from current_logs", rootWals.remove(new Path("/accumulo/wals/host1+9997/2")));
+ @Test
+ public void replicationDelaysFileCollection() throws Exception {
+ AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
+ VolumeManager fs = EasyMock.createMock(VolumeManager.class);
+ WalMarker marker = EasyMock.createMock(WalMarker.class);
+ LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
+ Connector conn = EasyMock.createMock(Connector.class);
+ Scanner scanner = EasyMock.createMock(Scanner.class);
+ String row = MetadataSchema.ReplicationSection.getRowPrefix() + path.toString();
+ String colf = MetadataSchema.ReplicationSection.COLF.toString();
+ String colq = "1";
+ Map<Key, Value> replicationWork = Collections.singletonMap(new Key(row, colf, colq), new Value(new byte[0]));
+
+
+ GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+
+ EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+ EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
+ EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<WalState,Path>(WalState.UNREFERENCED, path));
+ EasyMock.expect(context.getConnector()).andReturn(conn);
+ EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
+ scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+ EasyMock.expectLastCall().once();
+ scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(scanner.iterator()).andReturn(replicationWork.entrySet().iterator());
+ EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+ GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer1List);
+ gc.collect(status);
+ EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
}
}