You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/04/25 01:21:03 UTC
[17/34] accumulo git commit: ACCUMULO-3423 more updates based on
[~kturner]'s review
ACCUMULO-3423 more updates based on [~kturner]'s review
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/affff422
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/affff422
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/affff422
Branch: refs/heads/master
Commit: affff422ddd91cd96c8ca84dc824d931a1b72f70
Parents: c71fc12
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Mar 30 16:05:59 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Mon Mar 30 16:05:59 2015 -0400
----------------------------------------------------------------------
.../accumulo/server/util/MetadataTableUtil.java | 1 +
.../gc/GarbageCollectWriteAheadLogs.java | 34 ++++++++
.../apache/accumulo/tserver/TabletServer.java | 24 ++++--
.../tserver/log/TabletServerLogger.java | 40 ++++------
.../accumulo/test/GarbageCollectWALIT.java | 81 ++++++++++++++++++++
.../accumulo/test/functional/WALSunnyDayIT.java | 10 +++
6 files changed, 159 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index f5326bf..edc6189 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -1112,6 +1112,7 @@ public class MetadataTableUtil {
}
public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<Path> all) throws AccumuloException {
+ // There could be a marker at the meta and/or root level, mark them both as unused
try {
BatchWriter root = null;
BatchWriter meta = null;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index d523706..59612ab 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -50,9 +50,12 @@ import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.master.LiveTServerSet;
import org.apache.accumulo.server.master.LiveTServerSet.Listener;
import org.apache.accumulo.server.master.state.MetaDataStateStore;
@@ -63,7 +66,9 @@ import org.apache.accumulo.server.master.state.TabletState;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.Text;
import org.apache.htrace.Span;
import org.apache.zookeeper.KeeperException;
@@ -406,6 +411,35 @@ public class GarbageCollectWriteAheadLogs {
}
}
+ // scan HDFS for logs for dead servers
+ for (Volume volume : VolumeManagerImpl.get().getVolumes()) {
+ RemoteIterator<LocatedFileStatus> iter = volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true);
+ while (iter.hasNext()) {
+ LocatedFileStatus next = iter.next();
+ // recursive listing returns directories, too
+ if (next.isDirectory()) {
+ continue;
+ }
+ // make sure we've waited long enough for zookeeper propagation
+ if (System.currentTimeMillis() - next.getModificationTime() < context.getConnector().getInstance().getZooKeepersSessionTimeOut()) {
+ continue;
+ }
+ Path path = next.getPath();
+ String hostPlusPort = path.getParent().getName();
+ // server is still alive, or has a replacement
+ TServerInstance instance = liveServers.find(hostPlusPort);
+ if (instance != null) {
+ continue;
+ }
+ TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
+ Set<Path> paths = unusedLogs.get(fake);
+ if (paths == null) {
+ paths = new HashSet<>();
+ }
+ paths.add(path);
+ unusedLogs.put(fake, paths);
+ }
+ }
return count;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 436fb12..e28d472 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -2988,7 +2988,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
// avoid unnecessary redundant markings to meta
- ConcurrentHashMap<DfsLogger, EnumSet<TabletLevel>> metadataTableLogs = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<DfsLogger, EnumSet<TabletLevel>> metadataTableLogs = new ConcurrentHashMap<>();
+ final Object levelLocks[] = new Object[TabletLevel.values().length];
+ {
+ for (int i = 0; i < levelLocks.length; i++) {
+ levelLocks[i] = new Object();
+ }
+ }
+
// remove any meta entries after a rolled log is no longer referenced
Set<DfsLogger> closedLogs = new HashSet<>();
@@ -3018,14 +3025,19 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
public void addLoggersToMetadata(DfsLogger copy, KeyExtent extent) {
TabletLevel level = TabletLevel.getLevel(extent);
// serialize the updates to the metadata per level: avoids updating the level more than once
- synchronized (level) {
- EnumSet<TabletLevel> set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
+ // updating one level, may cause updates to other levels, so we need to release the lock on metadataTableLogs
+ synchronized (levelLocks[level.ordinal()]) {
+ EnumSet<TabletLevel> set = null;
+ synchronized (metadataTableLogs) {
+ set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
+ }
if (set == null || !set.contains(level) || level == TabletLevel.ROOT) {
log.info("Writing log marker for level " + level + " " + copy.getFileName());
MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getPath(), extent);
- if (set != null) {
- set.add(level);
- }
+ }
+ synchronized (metadataTableLogs) {
+ set = metadataTableLogs.get(copy);
+ set.add(level);
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 0f3f642..04e7a83 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -77,7 +78,7 @@ public class TabletServerLogger {
// The current logger
private DfsLogger currentLog = null;
- private final AtomicReference<DfsLogger> nextLog = new AtomicReference<>(null);
+ private final SynchronousQueue<DfsLogger> nextLog = new SynchronousQueue<>();
private final ThreadPoolExecutor nextLogMaker = new SimpleThreadPool(1, "WALog creator");
// The current generation of logs.
@@ -149,6 +150,7 @@ public class TabletServerLogger {
this.maxSize = maxSize;
this.syncCounter = syncCounter;
this.flushCounter = flushCounter;
+ startLogMaker();
}
private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException {
@@ -198,18 +200,9 @@ public class TabletServerLogger {
}
try {
- DfsLogger next = nextLog.getAndSet(null);
- if (next != null) {
- log.info("Using next log " + next.getFileName());
- currentLog = next;
- } else {
- DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
- alog.open(tserver.getClientAddressString());
- currentLog = alog;
- }
- if (nextLog.get() == null) {
- createNextLog();
- }
+ DfsLogger next = nextLog.take();
+ log.info("Using next log " + next.getFileName());
+ currentLog = next;
logId.incrementAndGet();
return;
} catch (Exception t) {
@@ -221,12 +214,11 @@ public class TabletServerLogger {
}
}
- // callers are synchronized already
- private void createNextLog() {
- if (nextLogMaker.getActiveCount() == 0) {
- nextLogMaker.submit(new Runnable() {
- @Override
- public void run() {
+ private void startLogMaker() {
+ nextLogMaker.submit(new Runnable() {
+ @Override
+ public void run() {
+ while (!nextLogMaker.isShutdown()) {
try {
log.debug("Creating next WAL");
DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
@@ -235,17 +227,15 @@ public class TabletServerLogger {
tserver.addLoggersToMetadata(alog, tablet.getExtent());
}
log.debug("Created next WAL " + alog.getFileName());
- alog = nextLog.getAndSet(alog);
- if (alog != null) {
- log.debug("closing unused next log: " + alog.getFileName());
- alog.close();
+ while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
+ log.info("Our WAL was not used for 12 hours: " + alog.getFileName());
}
} catch (Exception t) {
log.error("{}", t.getMessage(), t);
}
}
- });
- }
+ }
+ });
}
public void resetLoggers() throws IOException {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
new file mode 100644
index 0000000..0324e4a
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class GarbageCollectWALIT extends ConfigurableMacIT {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_HOST, "5s");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+ cfg.setNumTservers(1);
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void test() throws Exception {
+ // not yet, please
+ String tableName = getUniqueNames(1)[0];
+ cluster.getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+ // count the number of WALs in the filesystem
+ assertEquals(2, countWALsInFS(cluster));
+ cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
+ cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
+ cluster.getClusterControl().start(ServerType.TABLET_SERVER);
+ Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+ // let GC run
+ UtilWaitThread.sleep(3 * 5 * 1000);
+ assertEquals(2, countWALsInFS(cluster));
+ }
+
+ private int countWALsInFS(MiniAccumuloClusterImpl cluster) throws Exception {
+ FileSystem fs = cluster.getFileSystem();
+ RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(cluster.getConfig().getAccumuloDir() + "/wal"), true);
+ int result = 0;
+ while (iterator.hasNext()) {
+ LocatedFileStatus next = iterator.next();
+ if (!next.isDirectory()) {
+ result++;
+ }
+ }
+ return result;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index b8e36bc..3f51c8d 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.io.Text;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.Iterators;
@@ -122,6 +123,7 @@ public class WALSunnyDayIT extends ConfigurableMacIT {
for (String table: new String[] { tableName, MetadataTable.NAME, RootTable.NAME} ) {
c.tableOperations().flush(table, null, null, true);
}
+ UtilWaitThread.sleep(1000);
// rolled WAL is no longer in use, but needs to be GC'd
Map<String,Boolean> walsAfterflush = getWals(c, zoo);
assertEquals(walsAfterflush.toString(), 3, walsAfterflush.size());
@@ -151,6 +153,7 @@ public class WALSunnyDayIT extends ConfigurableMacIT {
// put some data in the WAL
assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor());
+ verifySomeData(c, tableName, 1000 * 50 + 1);
writeSomeData(c, tableName, 100, 100);
Map<String,Boolean> walsAfterRestart = getWals(c, zoo);
@@ -163,6 +166,13 @@ public class WALSunnyDayIT extends ConfigurableMacIT {
assertEquals("logs in use should be 2", 2, countTrue(walsAfterRestartAndGC.values()));
}
+ private void verifySomeData(Connector c, String tableName, int expected) throws Exception {
+ Scanner scan = c.createScanner(tableName, EMPTY);
+ int result = Iterators.size(scan.iterator());
+ scan.close();
+ Assert.assertEquals(expected, result);
+ }
+
private void writeSomeData(Connector conn, String tableName, int row, int col) throws Exception {
Random rand = new Random();
BatchWriter bw = conn.createBatchWriter(tableName, null);