You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/04/25 01:21:01 UTC
[15/34] accumulo git commit: ACCUMULO-3423 updates based on review by
[~elserj] and [~kturner]
ACCUMULO-3423 updates based on review by [~elserj] and [~kturner]
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/afa887b6
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/afa887b6
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/afa887b6
Branch: refs/heads/master
Commit: afa887b6f5f131a06497eaf1d04ba8c55b0d2877
Parents: daa38ce
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Mar 30 11:25:19 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Mon Mar 30 11:25:19 2015 -0400
----------------------------------------------------------------------
.../core/metadata/schema/MetadataSchema.java | 2 +-
.../core/metadata/MetadataTableSchemaTest.java | 10 +-
.../server/master/state/MetaDataStateStore.java | 18 +--
.../master/state/TabletLocationState.java | 2 -
.../accumulo/server/util/ListVolumesUsed.java | 15 ++
.../server/util/MasterMetadataUtil.java | 2 +-
.../accumulo/server/util/MetadataTableUtil.java | 63 +++++---
.../gc/GarbageCollectWriteAheadLogs.java | 33 +++--
.../CloseWriteAheadLogReferences.java | 2 -
.../accumulo/master/TabletGroupWatcher.java | 2 +
.../server/GarbageCollectionLogger.java | 2 +-
.../apache/accumulo/tserver/TabletServer.java | 3 +
.../tserver/log/TabletServerLogger.java | 22 +--
.../apache/accumulo/tserver/tablet/Tablet.java | 4 +-
.../accumulo/tserver/log/LogEntryTest.java | 56 ++++++++
.../org/apache/accumulo/test/UnusedWALIT.java | 144 +++++++++++++++++++
.../java/org/apache/accumulo/test/VolumeIT.java | 17 +++
17 files changed, 337 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 88e11f4..d2f7d07 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -322,7 +322,7 @@ public class MetadataSchema {
Text row = new Text();
k.getRow(row);
- if (row.getLength() < section.getRowPrefix().length()) {
+ if (!row.toString().startsWith(section.getRowPrefix())) {
throw new IllegalArgumentException("Bad key " + k.toString());
}
for (int sessionStart = section.getRowPrefix().length(); sessionStart < row.getLength() - 1; sessionStart++) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
index dfc74cf..cfe59f2 100644
--- a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
@@ -18,6 +18,7 @@
package org.apache.accumulo.core.metadata;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
@@ -28,12 +29,19 @@ public class MetadataTableSchemaTest {
@Test
public void testGetTabletServer() throws Exception {
- Key key = new Key("~wal:host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
+ Key key = new Key("~wal+host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
Text hostPort = new Text();
Text session = new Text();
CurrentLogsSection.getTabletServer(key, hostPort, session);
assertEquals("host:43861", hostPort.toString());
assertEquals("14a7df0e6420003", session.toString());
+ try {
+ Key bogus = new Key("~wal/host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
+ CurrentLogsSection.getTabletServer(bogus, hostPort, session);
+ fail("bad argument not thrown");
+ } catch (IllegalArgumentException ex) {
+
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index decc8c7..adcf04d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -132,19 +132,19 @@ public class MetaDataStateStore extends TabletStateStore {
Mutation m = new Mutation(tls.extent.getMetadataEntry());
if (tls.current != null) {
tls.current.clearLocation(m);
+ if (logsForDeadServers != null) {
+ List<Path> logs = logsForDeadServers.get(tls.current);
+ if (logs != null) {
+ for (Path log : logs) {
+ LogEntry entry = new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString());
+ m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
+ }
+ }
+ }
}
if (tls.future != null) {
tls.future.clearFutureLocation(m);
}
- if (logsForDeadServers != null) {
- List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent());
- if (logs != null) {
- for (Path log : logs) {
- LogEntry entry = new LogEntry(tls.extent, 0, tls.futureOrCurrent().hostPort(), log.toString());
- m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
- }
- }
- }
writer.addMutation(m);
}
} catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
index ebad2c8..a222532 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.io.Text;
*/
public class TabletLocationState {
- // private static final Logger log = Logger.getLogger(TabletLocationState.class);
-
static public class BadLocationStateException extends Exception {
private static final long serialVersionUID = 1L;
private Text metadataTableEntry;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index bf812cd..9e3fc7d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
/**
*
@@ -120,6 +121,20 @@ public class ListVolumesUsed {
for (String volume : volumes)
System.out.println("\tVolume : " + volume);
+
+ volumes.clear();
+ scanner.clearColumns();
+ scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
+ Text path = new Text();
+ for (Entry<Key,Value> entry : scanner) {
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ volumes.add(getLogURI(path.toString()));
+ }
+
+ System.out.println("Listing volumes referenced in " + name + " current logs section");
+
+ for (String volume : volumes)
+ System.out.println("\tVolume : " + volume);
}
public static void listVolumes(ClientContext context) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index 24092f9..fb6c4ee 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -267,7 +267,7 @@ public class MasterMetadataUtil {
while (true) {
try {
if (zk.exists(zpath)) {
- log.debug("Removing " + zpath);
+ log.debug("Removing WAL reference for root table " + zpath);
zk.recursiveDelete(zpath, NodeMissingPolicy.SKIP);
}
break;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index db00b9c..f5326bf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -1065,8 +1065,12 @@ public class MetadataTableUtil {
public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
String uniqueId = filename.getName();
- String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
- rw.putPersistentData(path, filename.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
+ StringBuilder path = new StringBuilder(root);
+ path.append("/");
+ path.append(CurrentLogsSection.getRowPrefix());
+ path.append(tabletSession.toString());
+ path.append(uniqueId);
+ rw.putPersistentData(path.toString(), filename.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
}
});
} else {
@@ -1076,12 +1080,20 @@ public class MetadataTableUtil {
if (extent.isMeta()) {
tableName = RootTable.NAME;
}
+ BatchWriter bw = null;
try {
- BatchWriter bw = context.getConnector().createBatchWriter(tableName, null);
+ bw = context.getConnector().createBatchWriter(tableName, null);
bw.addMutation(m);
- bw.close();
} catch (Exception e) {
throw new RuntimeException(e);
+ } finally {
+ if (bw != null) {
+ try {
+ bw.close();
+ } catch (Exception e2) {
+ throw new RuntimeException(e2);
+ }
+ }
}
}
}
@@ -1101,21 +1113,30 @@ public class MetadataTableUtil {
public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<Path> all) throws AccumuloException {
try {
- BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null);
- BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
- for (Path fname : all) {
- Text tname = new Text(fname.toString());
- Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
- m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
- root.addMutation(m);
- log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname);
- m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
- m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
- meta.addMutation(m);
- removeCurrentRootLogMarker(context, lock, tabletSession, fname);
+ BatchWriter root = null;
+ BatchWriter meta = null;
+ try {
+ root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+ meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+ for (Path fname : all) {
+ Text tname = new Text(fname.toString());
+ Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+ m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
+ root.addMutation(m);
+ log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname);
+ m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+ m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
+ meta.addMutation(m);
+ removeCurrentRootLogMarker(context, lock, tabletSession, fname);
+ }
+ } finally {
+ if (root != null) {
+ root.close();
+ }
+ if (meta != null) {
+ meta.close();
+ }
}
- root.close();
- meta.close();
} catch (Exception ex) {
throw new AccumuloException(ex);
}
@@ -1150,8 +1171,12 @@ public class MetadataTableUtil {
Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY);
scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
List<Path> logs = new ArrayList<>();
+ Text path = new Text();
for (Entry<Key,Value> entry : scanner) {
- logs.add(new Path(entry.getKey().getColumnQualifier().toString()));
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ if (!entry.getValue().equals(MetadataSchema.CurrentLogsSection.UNUSED)) {
+ logs.add(new Path(path.toString()));
+ }
}
logsForDeadServers.put(server, logs);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index cf068ed..d523706 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -180,19 +180,28 @@ public class GarbageCollectWriteAheadLogs {
private long removeMarkers(Map<TServerInstance,Set<Path>> candidates) {
long result = 0;
try {
- BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null);
- BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
- for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
- Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
- for (Path path : entry.getValue()) {
- m.putDelete(CurrentLogsSection.COLF, new Text(path.toString()));
- result++;
+ BatchWriter root = null;
+ BatchWriter meta = null;
+ try {
+ root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+ meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+ for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
+ Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
+ for (Path path : entry.getValue()) {
+ m.putDelete(CurrentLogsSection.COLF, new Text(path.toString()));
+ result++;
+ }
+ root.addMutation(m);
+ meta.addMutation(m);
+ }
+ } finally {
+ if (meta != null) {
+ meta.close();
+ }
+ if (root != null) {
+ root.close();
}
- root.addMutation(m);
- meta.addMutation(m);
}
- meta.close();
- root.close();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -386,7 +395,7 @@ public class GarbageCollectWriteAheadLogs {
CurrentLogsSection.getPath(entry.getKey(), filename);
TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
Path path = new Path(filename.toString());
- if ((!currentServers.contains(tsi) || (entry.getValue().equals(CurrentLogsSection.UNUSED)) && !rootWALs.contains(path))) {
+ if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED) && !rootWALs.contains(path)) {
Set<Path> logs = unusedLogs.get(tsi);
if (logs == null) {
unusedLogs.put(tsi, logs = new HashSet<Path>());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 6686cb8..455aaee 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -247,8 +247,6 @@ public class CloseWriteAheadLogReferences implements Runnable {
MetadataSchema.ReplicationSection.getFile(entry.getKey(), replFileText);
String replFile = replFileText.toString();
boolean isReferenced = referencedWals.contains(replFile);
- log.debug("replFile " + replFile);
- log.debug("referencedWals " + referencedWals);
// We only want to clean up WALs (which is everything but rfiles) and only when
// metadata doesn't have a reference to the given WAL
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index a536e98..9a7c40e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -313,6 +313,8 @@ class TabletGroupWatcher extends Daemon {
if (this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) {
Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+ } else {
+ Master.log.info("Detected change in current tserver set, re-running state machine.");
}
} catch (Exception ex) {
Master.log.error("Error processing table state for store " + store.name(), ex);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
index 5fe2548..57d8da1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
@@ -98,7 +98,7 @@ public class GarbageCollectionLogger {
final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
final long diff = now - lastMemoryCheckTime;
- if (diff > keepAliveTimeout + 1000) {
+ if (diff > keepAliveTimeout) {
log.warn(String.format("GC pause checker not called in a timely fashion. Expected every %.1f seconds but was %.1f seconds since last check",
keepAliveTimeout / 1000., diff / 1000.));
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index ffc1c2a..9389776 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1719,6 +1719,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
@Override
public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
log.warn("Garbage collector is attempting to remove logs through the tablet server");
+ log.warn("This is probably because your file Garbage Collector is an older version than your tablet servers.\n" +
+ "Restart your file Garbage Collector.");
}
}
@@ -3015,6 +3017,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
public void addLoggersToMetadata(DfsLogger copy, KeyExtent extent) {
TabletLevel level = TabletLevel.getLevel(extent);
+ // serialize the updates to the metadata per level: avoids updating the level more than once
synchronized (level) {
EnumSet<TabletLevel> set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
if (set == null || !set.contains(level) || level == TabletLevel.ROOT) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index accfc5e..0f3f642 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -262,17 +262,19 @@ public class TabletServerLogger {
throw new IllegalStateException("close should be called with write lock held!");
}
try {
- try {
- currentLog.close();
- } catch (DfsLogger.LogClosedException ex) {
- // ignore
- } catch (Throwable ex) {
- log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex, ex);
- } finally {
- this.tserver.walogClosed(currentLog);
+ if (null != currentLog) {
+ try {
+ currentLog.close();
+ } catch (DfsLogger.LogClosedException ex) {
+ // ignore
+ } catch (Throwable ex) {
+ log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex, ex);
+ } finally {
+ this.tserver.walogClosed(currentLog);
+ }
+ currentLog = null;
+ logSizeEstimate.set(0);
}
- currentLog = null;
- logSizeEstimate.set(0);
} catch (Throwable t) {
throw new IOException(t);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index b30578a..fb0adb8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -933,8 +933,8 @@ public class Tablet implements TabletCommitter {
long count = 0;
+ String oldName = Thread.currentThread().getName();
try {
- String oldName = Thread.currentThread().getName();
Thread.currentThread().setName("Minor compacting " + this.extent);
Span span = Trace.start("write");
CompactionStats stats;
@@ -956,7 +956,6 @@ public class Tablet implements TabletCommitter {
commitSession, flushId);
} finally {
span.stop();
- Thread.currentThread().setName(oldName);
}
return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten());
} catch (Exception e) {
@@ -967,6 +966,7 @@ public class Tablet implements TabletCommitter {
failed = true;
throw new RuntimeException(e);
} finally {
+ Thread.currentThread().setName(oldName);
try {
getTabletMemory().finalizeMinC();
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
new file mode 100644
index 0000000..44058d3
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class LogEntryTest {
+
+ @Test
+ public void test() throws Exception {
+ KeyExtent extent = new KeyExtent(new Text("1"), null, new Text(""));
+ long ts = 12345678L;
+ String server = "localhost:1234";
+ String filename = "default/foo";
+ LogEntry entry = new LogEntry(extent, ts, server, filename);
+ assertEquals(extent, entry.extent);
+ assertEquals(server, entry.server);
+ assertEquals(filename, entry.filename);
+ assertEquals(ts, entry.timestamp);
+ assertEquals("1<; default/foo", entry.toString());
+ assertEquals(new Text("log"), entry.getColumnFamily());
+ assertEquals(new Text("localhost:1234/default/foo"), entry.getColumnQualifier());
+ LogEntry copy = LogEntry.fromBytes(entry.toBytes());
+ assertEquals(entry.toString(), copy.toString());
+ Key key = new Key(new Text("1<"), new Text("log"), new Text("localhost:1234/default/foo"));
+ key.setTimestamp(ts);
+ LogEntry copy2 = LogEntry.fromKeyValue(key, entry.getValue());
+ assertEquals(entry.toString(), copy2.toString());
+ assertEquals(entry.timestamp, copy2.timestamp);
+ assertEquals("foo", entry.getUniqueID());
+ assertEquals("localhost:1234/default/foo", entry.getName());
+ assertEquals(new Value("default/foo".getBytes()), entry.getValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java
new file mode 100644
index 0000000..3684ee1
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+// When reviewing the changes for ACCUMULO-3423, kturner suggested
+// "tablets will now have log references that contain no data,
+// so it may be marked with 3 WALs, the first with data, the 2nd without, a 3rd with data.
+// It would be useful to have an IT that will test this situation.
+public class UnusedWALIT extends ConfigurableMacIT {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ final long logSize = 1024 * 1024 * 10;
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, Long.toString(logSize));
+ cfg.setNumTservers(1);
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ hadoopCoreSite.set("fs.namenode.fs-limits.min-block-size", Long.toString(logSize));
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void test() throws Exception {
+ // don't want this bad boy cleaning up walog entries
+ getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+
+ // make two tables
+ String[] tableNames = getUniqueNames(2);
+ String bigTable = tableNames[0];
+ String lilTable = tableNames[1];
+ Connector c = getConnector();
+ c.tableOperations().create(bigTable);
+ c.tableOperations().create(lilTable);
+
+ // put some data in a log that should be replayed for both tables
+ writeSomeData(c, bigTable, 0, 10, 0, 10);
+ scanSomeData(c, bigTable, 0, 10, 0, 10);
+ writeSomeData(c, lilTable, 0, 1, 0, 1);
+ scanSomeData(c, lilTable, 0, 1, 0, 1);
+ assertEquals(1, getWALCount(c));
+
+ // roll the logs by pushing data into bigTable
+ writeSomeData(c, bigTable, 0, 3000, 0, 1000);
+ assertEquals(3, getWALCount(c));
+
+ // put some data in the latest log
+ writeSomeData(c, lilTable, 1, 10, 0, 10);
+ scanSomeData(c, lilTable, 1, 10, 0, 10);
+
+ // bounce the tserver
+ getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
+ getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+
+ // wait for the metadata table to be online
+ Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+
+ // check our two sets of data in different logs
+ scanSomeData(c, lilTable, 0, 1, 0, 1);
+ scanSomeData(c, lilTable, 1, 10, 0, 10);
+ }
+
+ private void scanSomeData(Connector c, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception {
+ Scanner s = c.createScanner(table, Authorizations.EMPTY);
+ s.setRange(new Range(Integer.toHexString(startRow), Integer.toHexString(startRow + rowCount)));
+ int row = startRow;
+ int col = startCol;
+ for (Entry<Key,Value> entry : s) {
+ assertEquals(row, Integer.parseInt(entry.getKey().getRow().toString(), 16));
+ assertEquals(col++, Integer.parseInt(entry.getKey().getColumnQualifier().toString(), 16));
+ if (col == startCol + colCount) {
+ col = startCol;
+ row++;
+ if (row == startRow + rowCount) {
+ break;
+ }
+ }
+ }
+ assertEquals(row, startRow + rowCount);
+ }
+
+ private int getWALCount(Connector c) throws Exception {
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(CurrentLogsSection.getRange());
+ try {
+ return Iterators.size(s.iterator());
+ } finally {
+ s.close();
+ }
+ }
+
+ private void writeSomeData(Connector conn, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception {
+ BatchWriterConfig config = new BatchWriterConfig();
+ config.setMaxMemory(10 * 1024 * 1024);
+ BatchWriter bw = conn.createBatchWriter(table, config);
+ for (int r = startRow; r < startRow + rowCount; r++) {
+ Mutation m = new Mutation(Integer.toHexString(r));
+ for (int c = startCol; c < startCol + colCount; c++) {
+ m.put("", Integer.toHexString(c), "");
+ }
+ bw.addMutation(m);
+ }
+ bw.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index 7f1f921..7aeb135 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -103,6 +103,7 @@ public class VolumeIT extends ConfigurableMacIT {
cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath());
cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost());
cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString());
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
@@ -425,6 +426,21 @@ public class VolumeIT extends ConfigurableMacIT {
Assert.fail("Unexpected volume " + path);
}
+ Text path = new Text();
+ for (String table : new String[]{RootTable.NAME, MetadataTable.NAME}) {
+ Scanner meta = conn.createScanner(table, Authorizations.EMPTY);
+ meta.setRange(MetadataSchema.CurrentLogsSection.getRange());
+ outer: for (Entry<Key,Value> entry : meta) {
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ for (int i = 0; i < paths.length; i++) {
+ if (path.toString().startsWith(paths[i].toString())) {
+ continue outer;
+ }
+ }
+ Assert.fail("Unexpected volume " + path);
+ }
+ }
+
// if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes -
// 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18
@@ -435,6 +451,7 @@ public class VolumeIT extends ConfigurableMacIT {
}
Assert.assertEquals(200, sum);
+
}
@Test