You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/05/10 23:05:51 UTC
[05/19] accumulo git commit: Revert "ACCUMULO-3772 GC must read both
current_logs and walogs in ZK"
Revert "ACCUMULO-3772 GC must read both current_logs and walogs in ZK"
This reverts commit 2938cfad91949b205e500bc73137e15e713c4262.
Conflicts:
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4004071c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4004071c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4004071c
Branch: refs/heads/1.7
Commit: 4004071c0f3228a0903c618155e4944521f46837
Parents: e7571e2
Author: Josh Elser <el...@apache.org>
Authored: Sat May 9 14:42:04 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Sat May 9 14:42:04 2015 -0400
----------------------------------------------------------------------
.../gc/GarbageCollectWriteAheadLogs.java | 153 +++++------------
.../gc/GarbageCollectWriteAheadLogsTest.java | 168 -------------------
2 files changed, 39 insertions(+), 282 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4004071c/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 194d357..40acf8b 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -32,7 +32,6 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
@@ -55,7 +54,6 @@ import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.fs.VolumeManager;
@@ -78,7 +76,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.net.HostAndPort;
@@ -87,10 +84,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
public class GarbageCollectWriteAheadLogs {
private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
- // The order of these is _very_ important. Must read from current_logs, then walogs because ZooTabletStateStore writes to
- // walogs and then removes from current_logs
- private static final String[] ZK_LOG_SUFFIXES = new String[] {RootTable.ZROOT_TABLET_CURRENT_LOGS, RootTable.ZROOT_TABLET_WALOGS};
-
private final AccumuloServerContext context;
private final VolumeManager fs;
private final boolean useTrash;
@@ -120,26 +113,6 @@ public class GarbageCollectWriteAheadLogs {
liveServers.startListeningForTabletServerChanges();
}
- /**
- * Creates a new GC WAL object. Meant for testing -- allows mocked objects.
- *
- * @param context
- * the collection server's context
- * @param fs
- * volume manager to use
- * @param useTrash
- * true to move files to trash rather than delete them
- * @param liveTServerSet
- * a started LiveTServerSet instance
- */
- @VisibleForTesting
- GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, LiveTServerSet liveTServerSet) throws IOException {
- this.context = context;
- this.fs = fs;
- this.useTrash = useTrash;
- this.liveServers = liveTServerSet;
- }
-
public void collect(GCStatus status) {
Span span = Trace.start("getCandidates");
@@ -397,6 +370,9 @@ public class GarbageCollectWriteAheadLogs {
return metaScanner;
}
+
+
+
/**
* Scans log markers. The map passed in is populated with the logs for dead servers.
*
@@ -404,10 +380,16 @@ public class GarbageCollectWriteAheadLogs {
* map of dead server to log file entries
* @return total number of log files
*/
- private long getCurrent(Map<TServerInstance,Set<Path>> unusedLogs, Set<TServerInstance> currentServers) throws Exception {
- // Logs for the Root table are stored in ZooKeeper.
- Set<Path> rootWALs = getRootLogs(ZooReaderWriter.getInstance(), context.getInstance());
-
+ private long getCurrent(Map<TServerInstance, Set<Path> > unusedLogs, Set<TServerInstance> currentServers) throws Exception {
+ Set<Path> rootWALs = new HashSet<>();
+ // Get entries in zookeeper:
+ String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
+ ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ List<String> children = zoo.getChildren(zpath);
+ for (String child : children) {
+ LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null));
+ rootWALs.add(new Path(entry.filename));
+ }
long count = 0;
// get all the WAL markers that are not in zookeeper for dead servers
@@ -421,13 +403,10 @@ public class GarbageCollectWriteAheadLogs {
Text filename = new Text();
while (entries.hasNext()) {
Entry<Key,Value> entry = entries.next();
-
CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId);
CurrentLogsSection.getPath(entry.getKey(), filename);
TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
Path path = new Path(filename.toString());
-
- // A log is unused iff it's for a tserver which we don't know about or the log is marked as unused and it's not listed as used by the Root table
if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED) && !rootWALs.contains(path)) {
Set<Path> logs = unusedLogs.get(tsi);
if (logs == null) {
@@ -441,88 +420,34 @@ public class GarbageCollectWriteAheadLogs {
// scan HDFS for logs for dead servers
for (Volume volume : VolumeManagerImpl.get().getVolumes()) {
- addUnusedWalsFromVolume(volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true), unusedLogs, context.getConnector()
- .getInstance().getZooKeepersSessionTimeOut());
- }
- return count;
- }
-
- /**
- * Fetch the WALs which are, or were, referenced by the Root Table
- *
- * @return The Set of WALs which are needed by the Root Table
- */
- Set<Path> getRootLogs(final ZooReader zoo, Instance instance) throws Exception {
- final HashSet<Path> rootWALs = new HashSet<>();
- final String zkRoot = ZooUtil.getRoot(instance);
-
- // Get entries in zookeeper -- order in ZK_LOG_SUFFIXES is _very_ important
- for (String pathSuffix : ZK_LOG_SUFFIXES) {
- addLogsForNode(zoo, zkRoot + pathSuffix, rootWALs);
- }
-
- return rootWALs;
- }
-
- /**
- * Read all WALs from the given path in ZooKeeper and add the paths to each WAL to the provided <code>rootWALs</code>
- *
- * @param reader
- * A reader to ZooKeeper
- * @param zpath
- * The base path to read in ZooKeeper
- * @param rootWALs
- * A Set to collect the WALs in
- */
- void addLogsForNode(ZooReader reader, String zpath, HashSet<Path> rootWALs) throws Exception {
- // Get entries in zookeeper:
- List<String> children = reader.getChildren(zpath);
- for (String child : children) {
- LogEntry entry = LogEntry.fromBytes(reader.getData(zpath + "/" + child, null));
- rootWALs.add(new Path(entry.filename));
- }
- }
-
- /**
- * Given a traversal over the <code>wals</code> directory on a {@link Volume}, add all unused WALs
- *
- * @param iter
- * Iterator over files in the <code>wals</code> directory
- * @param unusedLogs
- * Map tracking unused WALs by server
- */
- void addUnusedWalsFromVolume(RemoteIterator<LocatedFileStatus> iter, Map<TServerInstance,Set<Path>> unusedLogs, int zkTimeout) throws Exception {
- while (iter.hasNext()) {
- LocatedFileStatus next = iter.next();
- // recursive listing returns directories, too
- if (next.isDirectory()) {
- continue;
- }
- // make sure we've waited long enough for zookeeper propagation
- //
- // We aren't guaranteed to see the updates through the live tserver set just because the time since the file was
- // last modified is longer than the ZK timeout.
- // 1. If we think the server is alive, but it's actually dead, we'll grab it on a later cycle. Which is OK.
- // 2. If we think the server is dead but it happened to be restarted it's possible to have a server which would differ only by session.
- // This is also OK because the new TServer will create a new WAL.
- if (System.currentTimeMillis() - next.getModificationTime() < zkTimeout) {
- continue;
- }
- Path path = next.getPath();
- String hostPlusPort = path.getParent().getName();
- // server is still alive, or has a replacement (same host+port, different session)
- TServerInstance instance = liveServers.find(hostPlusPort);
- if (instance != null) {
- continue;
- }
- TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
- Set<Path> paths = unusedLogs.get(fake);
- if (paths == null) {
- paths = new HashSet<>();
+ RemoteIterator<LocatedFileStatus> iter = volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true);
+ while (iter.hasNext()) {
+ LocatedFileStatus next = iter.next();
+ // recursive listing returns directories, too
+ if (next.isDirectory()) {
+ continue;
+ }
+ // make sure we've waited long enough for zookeeper propagation
+ if (System.currentTimeMillis() - next.getModificationTime() < context.getConnector().getInstance().getZooKeepersSessionTimeOut()) {
+ continue;
+ }
+ Path path = next.getPath();
+ String hostPlusPort = path.getParent().getName();
+ // server is still alive, or has a replacement
+ TServerInstance instance = liveServers.find(hostPlusPort);
+ if (instance != null) {
+ continue;
+ }
+ TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
+ Set<Path> paths = unusedLogs.get(fake);
+ if (paths == null) {
+ paths = new HashSet<>();
+ }
+ paths.add(path);
+ unusedLogs.put(fake, paths);
}
- paths.add(path);
- unusedLogs.put(fake, paths);
}
+ return count;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4004071c/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
deleted file mode 100644
index f431159..0000000
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.gc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.master.LiveTServerSet;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.io.Text;
-import org.easymock.EasyMock;
-import org.junit.Test;
-
-public class GarbageCollectWriteAheadLogsTest {
-
- private Map<TServerInstance,Set<Path>> runTest(LiveTServerSet tserverSet, String tserver, long modificationTime) throws Exception {
- // Mocks
- AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
- VolumeManager fs = EasyMock.createMock(VolumeManager.class);
- final LocatedFileStatus fileStatus = EasyMock.createMock(LocatedFileStatus.class);
-
- // Concrete objs
- GarbageCollectWriteAheadLogs gcWals = new GarbageCollectWriteAheadLogs(context, fs, true, tserverSet);
-
- RemoteIterator<LocatedFileStatus> iter = new RemoteIterator<LocatedFileStatus>() {
- boolean returnedOnce = false;
-
- @Override
- public boolean hasNext() throws IOException {
- return !returnedOnce;
- }
-
- @Override
- public LocatedFileStatus next() throws IOException {
- returnedOnce = true;
- return fileStatus;
- }
- };
-
- Map<TServerInstance,Set<Path>> unusedLogs = new HashMap<>();
-
- // Path is /accumulo/wals/host+port/UUID
- Path walPath = new Path("/accumulo/wals/" + tserver + "/" + UUID.randomUUID().toString());
- EasyMock.expect(fileStatus.getPath()).andReturn(walPath).anyTimes();
-
- EasyMock.expect(fileStatus.getModificationTime()).andReturn(modificationTime);
- EasyMock.expect(fileStatus.isDirectory()).andReturn(false);
-
- EasyMock.replay(context, fs, fileStatus, tserverSet);
-
- gcWals.addUnusedWalsFromVolume(iter, unusedLogs, 0);
-
- EasyMock.verify(context, fs, fileStatus, tserverSet);
-
- return unusedLogs;
- }
-
- @Test
- public void testUnnoticedServerFailure() throws Exception {
- LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
- TServerInstance tserverInstance = EasyMock.createMock(TServerInstance.class);
- String tserver = "host1+9997";
-
- // We find the TServer
- EasyMock.expect(tserverSet.find(tserver)).andReturn(tserverInstance);
-
- // But the modificationTime for the WAL was _way_ in the past.
- long modificationTime = 0l;
-
- // If the modification time for a WAL was significantly in the past (so far that the server _should_ have died
- // by now) but the GC hasn't observed via ZK Watcher that the server died, we would not treat the
- // WAL as unused.
- Map<TServerInstance,Set<Path>> unusedLogs = runTest(tserverSet, tserver, modificationTime);
-
- // We think the server is still alive, therefore we don't call the WAL unused.
- assertEquals(0, unusedLogs.size());
- }
-
- @Test
- public void testUnnoticedServerRestart() throws Exception {
- LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
- String tserver = "host1+9997";
-
- // The server was _once_ alive, but we saw it died.
- // Before the LiveTServerSet gets the Watcher that it's back online (with a new session)
- // the GC runs
- EasyMock.expect(tserverSet.find(tserver)).andReturn(null);
-
- // Modification time for the WAL was _way_ in the past.
- long modificationTime = 0l;
-
- Map<TServerInstance,Set<Path>> unusedLogs = runTest(tserverSet, tserver, modificationTime);
-
- // If the same server comes back, it will use a new WAL, not the old one. The log should be unused
- assertEquals(1, unusedLogs.size());
- }
-
- @Test
- public void testAllRootLogsInZk() throws Exception {
- // Mocks
- AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
- VolumeManager fs = EasyMock.createMock(VolumeManager.class);
- LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
- Instance instance = EasyMock.createMock(Instance.class);
- ZooReader zoo = EasyMock.createMock(ZooReader.class);
-
- // Fake out some WAL references
- final String instanceId = UUID.randomUUID().toString();
- final List<String> currentLogs = Arrays.asList("2");
- final List<String> walogs = Arrays.asList("1");
- LogEntry currentLogEntry = new LogEntry(new KeyExtent(new Text("+r"), null, null), 2, "host1:9997", "/accumulo/wals/host1+9997/2");
- LogEntry prevLogEntry = new LogEntry(new KeyExtent(new Text("+r"), null, null), 1, "host1:9997", "/accumulo/wals/host1+9997/1");
-
- GarbageCollectWriteAheadLogs gcWals = new GarbageCollectWriteAheadLogs(context, fs, true, tserverSet);
-
- // Define the expectations
- EasyMock.expect(instance.getInstanceID()).andReturn(instanceId).anyTimes();
- EasyMock.expect(zoo.getChildren(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_CURRENT_LOGS)).andReturn(currentLogs);
- EasyMock.expect(zoo.getChildren(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_WALOGS)).andReturn(walogs);
-
- EasyMock.expect(zoo.getData(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_CURRENT_LOGS + "/2", null)).andReturn(currentLogEntry.toBytes());
- EasyMock.expect(zoo.getData(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_WALOGS + "/1", null)).andReturn(prevLogEntry.toBytes());
-
- EasyMock.replay(instance, zoo);
-
- // Ensure that we see logs from both current_logs and walogs
- Set<Path> rootWals = gcWals.getRootLogs(zoo, instance);
-
- EasyMock.verify(instance, zoo);
-
- assertEquals(2, rootWals.size());
- assertTrue("Expected to find WAL from walogs", rootWals.remove(new Path("/accumulo/wals/host1+9997/1")));
- assertTrue("Expected to find WAL from current_logs", rootWals.remove(new Path("/accumulo/wals/host1+9997/2")));
- }
-}