You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/05 06:43:03 UTC
[21/35] git commit: ACCUMULO-2041 finished state management in Tablet
ACCUMULO-2041 finished state management in Tablet
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f280e971
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f280e971
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f280e971
Branch: refs/heads/ACCUMULO-378
Commit: f280e9713ca3016cec3c082321774d579c86d51e
Parents: 731abce
Author: Eric C. Newton <er...@gmail.com>
Authored: Tue Jun 3 16:42:43 2014 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Tue Jun 3 16:42:43 2014 -0400
----------------------------------------------------------------------
.../accumulo/server/tablets/TabletTime.java | 1 -
.../accumulo/tserver/TabletStatsKeeper.java | 1 +
.../accumulo/tserver/tablet/CompactionInfo.java | 16 +++
.../apache/accumulo/tserver/tablet/Tablet.java | 142 +++++++++----------
4 files changed, 85 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java
index 7f6dcf7..e3fd8f3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java
@@ -45,7 +45,6 @@ public abstract class TabletTime {
public abstract String getMetadataValue();
- // abstract long setUpdateTimes(Mutation mutation);
public abstract long setUpdateTimes(List<Mutation> mutations);
public abstract long getTime();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
index d914ac6..40906df 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
@@ -22,6 +22,7 @@ import org.apache.accumulo.server.util.ActionStatsUpdator;
public class TabletStatsKeeper {
+ // suspect we need more synchronization in this class
private ActionStats major = new ActionStats();
private ActionStats minor = new ActionStats();
private ActionStats split = new ActionStats();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
index ab57d65..8e9fb9b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.accumulo.tserver.tablet;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index dc2fc4d..2be00fe 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -143,7 +143,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
*
*/
public class Tablet implements TabletCommitter {
- static final Logger log = Logger.getLogger(Tablet.class);
+ static private final Logger log = Logger.getLogger(Tablet.class);
static private final List<LogEntry> NO_LOG_ENTRIES = Collections.emptyList();
private final TabletServer tabletServer;
@@ -166,23 +166,27 @@ public class Tablet implements TabletCommitter {
private final AtomicLong dataSourceDeletions = new AtomicLong(0);
public long getDataSourceDeletions() { return dataSourceDeletions.get(); }
private final Set<ScanDataSource> activeScans = new HashSet<ScanDataSource>();
+
+ private static enum CloseState {
+ OPEN,
+ CLOSING,
+ CLOSED,
+ COMPLETE
+ }
- private volatile boolean closing = false;
- private boolean closed = false;
- private boolean closeComplete = false;
+ private volatile CloseState closeState = CloseState.OPEN;
private boolean updatingFlushID = false;
private long lastFlushID = -1;
private long lastCompactID = -1;
- private volatile boolean majorCompactionInProgress = false;
- private volatile boolean majorCompactionWaitingToStart = false;
- private Set<MajorCompactionReason> majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
-
- private volatile boolean minorCompactionInProgress = false;
- private volatile boolean minorCompactionWaitingToStart = false;
+ static enum CompactionState { WAITING_TO_START, IN_PROGRESS };
+ private volatile CompactionState minorCompactionState = null;
+ private volatile CompactionState majorCompactionState = null;
+ private final Set<MajorCompactionReason> majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
+
private final AtomicReference<ConstraintChecker> constraintChecker = new AtomicReference<ConstraintChecker>();
private int writesInProgress = 0;
@@ -220,7 +224,7 @@ public class Tablet implements TabletCommitter {
return logId;
}
- public class LookupResult {
+ public static class LookupResult {
public List<Range> unfinishedRanges = new ArrayList<Range>();
public long bytesAdded = 0;
public long dataSize = 0;
@@ -228,7 +232,7 @@ public class Tablet implements TabletCommitter {
}
FileRef getNextMapFilename(String prefix) throws IOException {
- String extension = FileOperations.getNewFileExtension(this.tableConfiguration);
+ String extension = FileOperations.getNewFileExtension(tableConfiguration);
checkTabletDir();
return new FileRef(location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension);
}
@@ -237,18 +241,18 @@ public class Tablet implements TabletCommitter {
if (!tableDirChecked) {
FileStatus[] files = null;
try {
- files = getTabletServer().getFileSystem().listStatus(this.location);
+ files = getTabletServer().getFileSystem().listStatus(location);
} catch (FileNotFoundException ex) {
// ignored
}
if (files == null) {
- if (this.location.getName().startsWith("c-"))
- log.debug("Tablet " + extent + " had no dir, creating " + this.location); // its a clone dir...
+ if (location.getName().startsWith("c-"))
+ log.debug("Tablet " + extent + " had no dir, creating " + location); // its a clone dir...
else
- log.warn("Tablet " + extent + " had no dir, creating " + this.location);
+ log.warn("Tablet " + extent + " had no dir, creating " + location);
- getTabletServer().getFileSystem().mkdirs(this.location);
+ getTabletServer().getFileSystem().mkdirs(location);
}
tableDirChecked = true;
}
@@ -524,7 +528,7 @@ public class Tablet implements TabletCommitter {
configObserver.propertiesChanged();
if (!logEntries.isEmpty()) {
- log.info("Starting Write-Ahead Log recovery for " + this.extent);
+ log.info("Starting Write-Ahead Log recovery for " + extent);
final long[] count = new long[2];
final CommitSession commitSession = getTabletMemory().getCommitSession();
count[1] = Long.MIN_VALUE;
@@ -942,7 +946,7 @@ public class Tablet implements TabletCommitter {
if (lastFlushID >= tableFlushID)
return;
- if (closing || closed || getTabletMemory().memoryReservedForMinC())
+ if (isClosing() || isClosed() || getTabletMemory().memoryReservedForMinC())
return;
if (getTabletMemory().getMemTable().getNumEntries() == 0) {
@@ -1022,15 +1026,14 @@ public class Tablet implements TabletCommitter {
synchronized (this) {
t1 = System.currentTimeMillis();
- if (closing || closed || majorCompactionWaitingToStart || getTabletMemory().memoryReservedForMinC() || getTabletMemory().getMemTable().getNumEntries() == 0
+ if (isClosing() || isClosed() || majorCompactionState == CompactionState.WAITING_TO_START || getTabletMemory().memoryReservedForMinC() || getTabletMemory().getMemTable().getNumEntries() == 0
|| updatingFlushID) {
logMessage = new StringBuilder();
logMessage.append(extent.toString());
- logMessage.append(" closing " + closing);
- logMessage.append(" closed " + closed);
- logMessage.append(" majorCompactionWaitingToStart " + majorCompactionWaitingToStart);
+ logMessage.append(" closeState " + closeState);
+ logMessage.append(" majorCompactionState " + majorCompactionState);
if (getTabletMemory() != null)
logMessage.append(" tabletMemory.memoryReservedForMinC() " + getTabletMemory().memoryReservedForMinC());
if (getTabletMemory() != null && getTabletMemory().getMemTable() != null)
@@ -1145,8 +1148,7 @@ public class Tablet implements TabletCommitter {
throw new IllegalStateException("waitingForLogs < 0 " + writesInProgress);
}
- if (closed || getTabletMemory() == null) {
- // log.debug("tablet closed, can't commit");
+ if (isClosed() || getTabletMemory() == null) {
return null;
}
@@ -1217,7 +1219,7 @@ public class Tablet implements TabletCommitter {
throw new IllegalStateException("waitingForLogs <= 0 " + writesInProgress);
}
- if (closeComplete || getTabletMemory() == null) {
+ if (isCloseComplete() || getTabletMemory() == null) {
throw new IllegalStateException("aborting commit when tablet is closed");
}
@@ -1245,7 +1247,7 @@ public class Tablet implements TabletCommitter {
throw new IllegalStateException("commiting mutations after logging, but not waiting for any log messages");
}
- if (closed && closeComplete) {
+ if (isCloseComplete()) {
throw new IllegalStateException("tablet closed with outstanding messages to the logger");
}
@@ -1284,28 +1286,24 @@ public class Tablet implements TabletCommitter {
MinorCompactionTask mct = null;
synchronized (this) {
- if (closed || closing || closeComplete) {
- String msg = "Tablet " + getExtent() + " already";
- if (closed)
- msg += " closed";
- if (closing)
- msg += " closing";
- if (closeComplete)
- msg += " closeComplete";
+ if (isClosed() || isClosing() || isCloseComplete()) {
+ String msg = "Tablet " + getExtent() + " already " + closeState;
throw new IllegalStateException(msg);
}
// enter the closing state, no splits, minor, or major compactions can start
// should cause running major compactions to stop
- closing = true;
+ closeState = CloseState.CLOSING;
this.notifyAll();
// determines if inserts and queries can still continue while minor compacting
- closed = disableWrites;
+ if (disableWrites) {
+ closeState = CloseState.CLOSING;
+ }
// wait for major compactions to finish, setting closing to
// true should cause any running major compactions to abort
- while (majorCompactionInProgress) {
+ while (majorCompactionRunning()) {
try {
this.wait(50);
} catch (InterruptedException e) {
@@ -1349,9 +1347,8 @@ public class Tablet implements TabletCommitter {
synchronized void completeClose(boolean saveState, boolean completeClose) throws IOException {
- if (!closing || closeComplete || closeCompleting) {
- throw new IllegalStateException("closing = " + closing + " closed = " + closed + " closeComplete = " + closeComplete + " closeCompleting = "
- + closeCompleting);
+ if (!isClosing() || isCloseComplete() || closeCompleting) {
+ throw new IllegalStateException("closeState = " + closeState);
}
log.debug("completeClose(saveState=" + saveState + " completeClose=" + completeClose + ") " + getExtent());
@@ -1359,7 +1356,7 @@ public class Tablet implements TabletCommitter {
// ensure this method is only called once, also guards against multiple
// threads entering the method at the same time
closeCompleting = true;
- closed = true;
+ closeState = CloseState.CLOSED;
// modify dataSourceDeletions so scans will try to switch data sources and fail because the tablet is closed
dataSourceDeletions.incrementAndGet();
@@ -1422,7 +1419,8 @@ public class Tablet implements TabletCommitter {
tableConfiguration.getNamespaceConfiguration().removeObserver(configObserver);
tableConfiguration.removeObserver(configObserver);
- closeComplete = completeClose;
+ if (completeClose)
+ closeState = CloseState.COMPLETE;
}
private void closeConsistencyCheck() {
@@ -1491,7 +1489,7 @@ public class Tablet implements TabletCommitter {
public synchronized boolean initiateMajorCompaction(MajorCompactionReason reason) {
- if (closing || closed || !needsMajorCompaction(reason) || majorCompactionInProgress || majorCompactionQueued.contains(reason)) {
+ if (isClosing() || isClosed() || !needsMajorCompaction(reason) || majorCompactionRunning() || majorCompactionQueued.contains(reason)) {
return false;
}
@@ -1507,7 +1505,7 @@ public class Tablet implements TabletCommitter {
*
*/
public boolean needsMajorCompaction(MajorCompactionReason reason) {
- if (majorCompactionInProgress)
+ if (majorCompactionRunning())
return false;
if (reason == MajorCompactionReason.CHOP || reason == MajorCompactionReason.USER)
return true;
@@ -1678,7 +1676,7 @@ public class Tablet implements TabletCommitter {
public synchronized boolean needsSplit() {
boolean ret;
- if (closing || closed)
+ if (isClosing() || isClosed())
ret = false;
else
ret = findSplitRow(getDatafileManager().getFiles()) != null;
@@ -1689,7 +1687,7 @@ public class Tablet implements TabletCommitter {
// BEGIN PRIVATE METHODS RELATED TO MAJOR COMPACTION
private boolean isCompactionEnabled() {
- return !closing && !getTabletServer().isMajorCompactionDisabled();
+ return !isClosing() && !getTabletServer().isMajorCompactionDisabled();
}
private CompactionStats _majorCompact(MajorCompactionReason reason) throws IOException, CompactionCanceledException {
@@ -1725,13 +1723,13 @@ public class Tablet implements TabletCommitter {
t1 = System.currentTimeMillis();
- majorCompactionWaitingToStart = true;
+ majorCompactionState = CompactionState.WAITING_TO_START;
getTabletMemory().waitForMinC();
t2 = System.currentTimeMillis();
- majorCompactionWaitingToStart = false;
+ majorCompactionState = null;
notifyAll();
VolumeManager fs = getTabletServer().getFileSystem();
@@ -1953,11 +1951,11 @@ public class Tablet implements TabletCommitter {
// check that compaction is still needed - defer to splitting
majorCompactionQueued.remove(reason);
- if (closing || closed || !needsMajorCompaction(reason) || majorCompactionInProgress || needsSplit()) {
+ if (isClosing() || isClosed ()|| !needsMajorCompaction(reason) || majorCompactionRunning() || needsSplit()) {
return null;
}
- majorCompactionInProgress = true;
+ majorCompactionState = CompactionState.WAITING_TO_START;
}
try {
@@ -1978,7 +1976,7 @@ public class Tablet implements TabletCommitter {
// ensure we always reset boolean, even
// when an exception is thrown
synchronized (this) {
- majorCompactionInProgress = false;
+ majorCompactionState = null;
this.notifyAll();
}
@@ -2033,27 +2031,27 @@ public class Tablet implements TabletCommitter {
}
public synchronized boolean isClosing() {
- return closing;
+ return closeState == CloseState.CLOSING;
}
public synchronized boolean isClosed() {
- return closed;
+ return closeState == CloseState.CLOSED;
}
public synchronized boolean isCloseComplete() {
- return closeComplete;
+ return closeState == CloseState.COMPLETE;
}
public boolean majorCompactionRunning() {
- return this.majorCompactionInProgress;
+ return majorCompactionState == CompactionState.IN_PROGRESS;
}
public boolean isMinorCompactionQueued() {
- return minorCompactionWaitingToStart;
+ return minorCompactionState == CompactionState.WAITING_TO_START;
}
public boolean isMinorCompactionRunning() {
- return minorCompactionInProgress;
+ return minorCompactionState == CompactionState.IN_PROGRESS;
}
public boolean isMajorCompactionQueued() {
@@ -2104,11 +2102,11 @@ public class Tablet implements TabletCommitter {
if (splitPoint == null || splitPoint.row == null) {
log.info("had to abort split because splitRow was null");
- closing = false;
+ closeState = CloseState.OPEN;
return null;
}
- closed = true;
+ closeState = CloseState.CLOSING;
completeClose(true, false);
Text midRow = splitPoint.row;
@@ -2151,8 +2149,7 @@ public class Tablet implements TabletCommitter {
log.debug(String.format("offline split time : %6.2f secs", (t2 - t1) / 1000.0));
- closeComplete = true;
-
+ closeState = CloseState.COMPLETE;
return newTablets;
}
}
@@ -2213,7 +2210,7 @@ public class Tablet implements TabletCommitter {
// Don't do it if we spent too long waiting for the lock
long now = System.currentTimeMillis();
synchronized (this) {
- if (closed) {
+ if (isClosed()) {
throw new IOException("tablet " + extent + " is closed");
}
@@ -2328,7 +2325,7 @@ public class Tablet implements TabletCommitter {
try {
synchronized (this) {
- if (closed && closeComplete) {
+ if (isCloseComplete()) {
throw new IllegalStateException("Can not update logs of closed tablet " + extent);
}
@@ -2409,12 +2406,12 @@ public class Tablet implements TabletCommitter {
if (lastCompactID >= compactionId)
return;
- if (closing || closed || majorCompactionQueued.contains(MajorCompactionReason.USER) || majorCompactionInProgress)
+ if (isClosing() || isClosed() || majorCompactionQueued.contains(MajorCompactionReason.USER) || majorCompactionRunning())
return;
if (getDatafileManager().getDatafileSizes().size() == 0) {
// no files, so jsut update the metadata table
- majorCompactionInProgress = true;
+ majorCompactionState = CompactionState.IN_PROGRESS;
updateMetadata = true;
lastCompactID = compactionId;
} else
@@ -2428,7 +2425,7 @@ public class Tablet implements TabletCommitter {
MetadataTableUtil.updateTabletCompactID(extent, compactionId, SystemCredentials.get(), getTabletServer().getLock());
} finally {
synchronized (this) {
- majorCompactionInProgress = false;
+ majorCompactionState = null;
this.notifyAll();
}
}
@@ -2535,20 +2532,19 @@ public class Tablet implements TabletCommitter {
}
public void minorCompactionWaitingToStart() {
- minorCompactionWaitingToStart = true;
+ minorCompactionState = CompactionState.WAITING_TO_START;
}
public void minorCompactionStarted() {
- minorCompactionWaitingToStart = false;
- minorCompactionInProgress = true;
+ minorCompactionState = CompactionState.IN_PROGRESS;
}
public void minorCompactionComplete() {
- minorCompactionInProgress = false;
+ minorCompactionState = null;
}
public boolean isMajorCompactionRunning() {
- return majorCompactionInProgress;
+ return majorCompactionState == CompactionState.IN_PROGRESS;
}
public TabletStats getTabletStats() {
@@ -2558,6 +2554,4 @@ public class Tablet implements TabletCommitter {
public AtomicLong getScannedCounter() {
return scannedCount;
}
-
-
}