You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/04/25 01:21:04 UTC
[18/34] accumulo git commit: ACCUMULO-3423 updates from reviews
ACCUMULO-3423 updates from reviews
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9c2ca7a5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9c2ca7a5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9c2ca7a5
Branch: refs/heads/master
Commit: 9c2ca7a5cce8f7d0bacb100fed6405c86bde2a2d
Parents: affff42
Author: Eric C. Newton <er...@gmail.com>
Authored: Tue Apr 14 14:52:07 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Tue Apr 14 14:52:07 2015 -0400
----------------------------------------------------------------------
.../core/tabletserver/log/LogEntry.java | 4 +++
.../server/master/state/MetaDataStateStore.java | 3 ++
.../accumulo/server/replication/StatusUtil.java | 12 +++++---
.../gc/GarbageCollectWriteAheadLogs.java | 30 ++++++++++++++------
.../apache/accumulo/tserver/TabletServer.java | 4 +--
.../apache/accumulo/tserver/log/DfsLogger.java | 1 +
.../tserver/log/TabletServerLogger.java | 10 +++++--
7 files changed, 46 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
index 964e3b3..90ce692 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
@@ -81,6 +81,10 @@ public class LogEntry {
static private final Text EMPTY_TEXT = new Text();
public static LogEntry fromKeyValue(Key key, Value value) {
+ String qualifier = key.getColumnQualifier().toString();
+ if (qualifier.indexOf('/') < 1) {
+ throw new IllegalArgumentException("Bad key for log entry: " + key);
+ }
KeyExtent extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
String[] parts = key.getColumnQualifier().toString().split("/", 2);
String server = parts[0];
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index adcf04d..c154bd0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -168,6 +168,9 @@ public class MetaDataStateStore extends TabletStateStore {
BatchWriter writer = createBatchWriter();
try {
for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ continue;
+ }
Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
for (Path log : entry.getValue()) {
m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
index e973ebc..d72eea2 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
@@ -32,7 +32,6 @@ public class StatusUtil {
private static final Value INF_END_REPLICATION_STATUS_VALUE, CLOSED_STATUS_VALUE;
private static final Status.Builder CREATED_STATUS_BUILDER;
- private static final Status.Builder INF_END_REPLICATION_STATUS_BUILDER;
static {
CREATED_STATUS_BUILDER = Status.newBuilder();
@@ -46,7 +45,6 @@ public class StatusUtil {
builder.setEnd(0);
builder.setInfiniteEnd(true);
builder.setClosed(false);
- INF_END_REPLICATION_STATUS_BUILDER = builder;
INF_END_REPLICATION_STATUS = builder.build();
INF_END_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(INF_END_REPLICATION_STATUS);
@@ -155,8 +153,14 @@ public class StatusUtil {
/**
* @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
*/
- public static synchronized Status openWithUnknownLength(long timeCreated) {
- return INF_END_REPLICATION_STATUS_BUILDER.setCreatedTime(timeCreated).build();
+ public static Status openWithUnknownLength(long timeCreated) {
+ Builder builder = Status.newBuilder();
+ builder.setBegin(0);
+ builder.setEnd(0);
+ builder.setInfiniteEnd(true);
+ builder.setClosed(false);
+ builder.setCreatedTime(timeCreated);
+ return builder.build();
}
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 59612ab..9f537af 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.UUID;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -130,9 +131,9 @@ public class GarbageCollectWriteAheadLogs {
status.currentLog.candidates = count;
span.stop();
- span = Trace.start("removeMetadataEntries");
+ span = Trace.start("removeEntriesInUse");
try {
- count = removeMetadataEntries(candidates, status, currentServers);
+ count = removeEntriesInUse(candidates, status, currentServers);
} catch (Exception ex) {
log.error("Unable to scan metadata table", ex);
return;
@@ -165,7 +166,7 @@ public class GarbageCollectWriteAheadLogs {
span.stop();
span = Trace.start("removeMarkers");
- count = removeMarkers(candidates);
+ count = removeTabletServerMarkers(candidates);
long removeMarkersStop = System.currentTimeMillis();
log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.));
span.stop();
@@ -182,7 +183,7 @@ public class GarbageCollectWriteAheadLogs {
}
}
- private long removeMarkers(Map<TServerInstance,Set<Path>> candidates) {
+ private long removeTabletServerMarkers(Map<TServerInstance,Set<Path>> candidates) {
long result = 0;
try {
BatchWriter root = null;
@@ -231,15 +232,19 @@ public class GarbageCollectWriteAheadLogs {
return status.currentLog.deleted;
}
- private long removeMetadataEntries(Map<TServerInstance, Set<Path> > candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException,
+ private UUID path2uuid(Path path) {
+ return UUID.fromString(path.getName());
+ }
+
+ private long removeEntriesInUse(Map<TServerInstance, Set<Path> > candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException,
InterruptedException {
// remove any entries if there's a log reference, or a tablet is still assigned to the dead server
- Map<Path, TServerInstance> walToDeadServer = new HashMap<>();
+ Map<UUID, TServerInstance> walToDeadServer = new HashMap<>();
for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
for (Path file : entry.getValue()) {
- walToDeadServer.put(file, entry.getKey());
+ walToDeadServer.put(path2uuid(file), entry.getKey());
}
}
long count = 0;
@@ -254,9 +259,16 @@ public class GarbageCollectWriteAheadLogs {
}
for (Collection<String> wals : state.walogs) {
for (String wal : wals) {
- TServerInstance dead = walToDeadServer.get(new Path(wal));
+ UUID walUUID = path2uuid(new Path(wal));
+ TServerInstance dead = walToDeadServer.get(walUUID);
if (dead != null) {
- candidates.get(dead).remove(wal);
+ Iterator<Path> iter = candidates.get(dead).iterator();
+ while (iter.hasNext()) {
+ if (path2uuid(iter.next()).equals(walUUID)) {
+ iter.remove();
+ break;
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index e28d472..af45c14 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -2893,7 +2893,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
totalMinorCompactions.incrementAndGet();
logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
- removeUnusedWALs();
+ markUnusedWALs();
}
public void minorCompactionStarted(CommitSession tablet, int lastUpdateSequence, String newMapfileLocation) throws IOException {
@@ -3000,7 +3000,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
// remove any meta entries after a rolled log is no longer referenced
Set<DfsLogger> closedLogs = new HashSet<>();
- private void removeUnusedWALs() {
+ private void markUnusedWALs() {
Set<DfsLogger> candidates;
synchronized (closedLogs) {
candidates = new HashSet<>(closedLogs);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index e256604..6e9cdf9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -383,6 +383,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
public synchronized void open(String address) throws IOException {
String filename = UUID.randomUUID().toString();
+ log.debug("Address is " + address);
String logger = Joiner.on("+").join(address.split(":"));
log.debug("DfsLogger.open() begin");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 04e7a83..dd54798 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -79,7 +79,7 @@ public class TabletServerLogger {
// The current logger
private DfsLogger currentLog = null;
private final SynchronousQueue<DfsLogger> nextLog = new SynchronousQueue<>();
- private final ThreadPoolExecutor nextLogMaker = new SimpleThreadPool(1, "WALog creator");
+ private ThreadPoolExecutor nextLogMaker;
// The current generation of logs.
// Because multiple threads can be using a log at one time, a log
@@ -150,7 +150,6 @@ public class TabletServerLogger {
this.maxSize = maxSize;
this.syncCounter = syncCounter;
this.flushCounter = flushCounter;
- startLogMaker();
}
private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException {
@@ -200,6 +199,7 @@ public class TabletServerLogger {
}
try {
+ startLogMaker();
DfsLogger next = nextLog.take();
log.info("Using next log " + next.getFileName());
currentLog = next;
@@ -214,7 +214,11 @@ public class TabletServerLogger {
}
}
- private void startLogMaker() {
+ private synchronized void startLogMaker() {
+ if (nextLogMaker != null) {
+ return;
+ }
+ nextLogMaker = new SimpleThreadPool(1, "WALog creator");
nextLogMaker.submit(new Runnable() {
@Override
public void run() {