You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/05 06:43:03 UTC

[21/35] git commit: ACCUMULO-2041 finished state management in Tablet

ACCUMULO-2041 finished state management in Tablet


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f280e971
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f280e971
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f280e971

Branch: refs/heads/ACCUMULO-378
Commit: f280e9713ca3016cec3c082321774d579c86d51e
Parents: 731abce
Author: Eric C. Newton <er...@gmail.com>
Authored: Tue Jun 3 16:42:43 2014 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Tue Jun 3 16:42:43 2014 -0400

----------------------------------------------------------------------
 .../accumulo/server/tablets/TabletTime.java     |   1 -
 .../accumulo/tserver/TabletStatsKeeper.java     |   1 +
 .../accumulo/tserver/tablet/CompactionInfo.java |  16 +++
 .../apache/accumulo/tserver/tablet/Tablet.java  | 142 +++++++++----------
 4 files changed, 85 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java
index 7f6dcf7..e3fd8f3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java
@@ -45,7 +45,6 @@ public abstract class TabletTime {
   
   public abstract String getMetadataValue();
   
-  // abstract long setUpdateTimes(Mutation mutation);
   public abstract long setUpdateTimes(List<Mutation> mutations);
   
   public abstract long getTime();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
index d914ac6..40906df 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
@@ -22,6 +22,7 @@ import org.apache.accumulo.server.util.ActionStatsUpdator;
 
 public class TabletStatsKeeper {
   
+  // suspect we need more synchronization in this class
   private ActionStats major = new ActionStats();
   private ActionStats minor = new ActionStats();
   private ActionStats split = new ActionStats();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
index ab57d65..8e9fb9b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.accumulo.tserver.tablet;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index dc2fc4d..2be00fe 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -143,7 +143,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
  * 
  */
 public class Tablet implements TabletCommitter {
-  static final Logger log = Logger.getLogger(Tablet.class);
+  static private final Logger log = Logger.getLogger(Tablet.class);
   static private final List<LogEntry> NO_LOG_ENTRIES = Collections.emptyList();
 
   private final TabletServer tabletServer;
@@ -166,23 +166,27 @@ public class Tablet implements TabletCommitter {
   private final AtomicLong dataSourceDeletions = new AtomicLong(0);
   public long getDataSourceDeletions() { return dataSourceDeletions.get(); }
   private final Set<ScanDataSource> activeScans = new HashSet<ScanDataSource>();
+  
+  private static enum CloseState {
+    OPEN,
+    CLOSING,
+    CLOSED,
+    COMPLETE
+  }
 
-  private volatile boolean closing = false;
-  private boolean closed = false;
-  private boolean closeComplete = false;
+  private volatile CloseState closeState = CloseState.OPEN;
 
   private boolean updatingFlushID = false;
 
   private long lastFlushID = -1;
   private long lastCompactID = -1;
   
-  private volatile boolean majorCompactionInProgress = false;
-  private volatile boolean majorCompactionWaitingToStart = false;
-  private Set<MajorCompactionReason> majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
-  
-  private volatile boolean minorCompactionInProgress = false;
-  private volatile boolean minorCompactionWaitingToStart = false;
+  static enum CompactionState { WAITING_TO_START, IN_PROGRESS };
+  private volatile CompactionState minorCompactionState = null;
+  private volatile CompactionState majorCompactionState = null;
 
+  private final Set<MajorCompactionReason> majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
+  
   private final AtomicReference<ConstraintChecker> constraintChecker = new AtomicReference<ConstraintChecker>();
 
   private int writesInProgress = 0;
@@ -220,7 +224,7 @@ public class Tablet implements TabletCommitter {
     return logId;
   }
   
-  public class LookupResult {
+  public static class LookupResult {
     public List<Range> unfinishedRanges = new ArrayList<Range>();
     public long bytesAdded = 0;
     public long dataSize = 0;
@@ -228,7 +232,7 @@ public class Tablet implements TabletCommitter {
   }
 
   FileRef getNextMapFilename(String prefix) throws IOException {
-    String extension = FileOperations.getNewFileExtension(this.tableConfiguration);
+    String extension = FileOperations.getNewFileExtension(tableConfiguration);
     checkTabletDir();
     return new FileRef(location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension);
   }
@@ -237,18 +241,18 @@ public class Tablet implements TabletCommitter {
     if (!tableDirChecked) {
       FileStatus[] files = null;
       try {
-        files = getTabletServer().getFileSystem().listStatus(this.location);
+        files = getTabletServer().getFileSystem().listStatus(location);
       } catch (FileNotFoundException ex) {
         // ignored
       }
       
       if (files == null) {
-        if (this.location.getName().startsWith("c-"))
-          log.debug("Tablet " + extent + " had no dir, creating " + this.location); // its a clone dir...
+        if (location.getName().startsWith("c-"))
+          log.debug("Tablet " + extent + " had no dir, creating " + location); // its a clone dir...
         else
-          log.warn("Tablet " + extent + " had no dir, creating " + this.location);
+          log.warn("Tablet " + extent + " had no dir, creating " + location);
       
-        getTabletServer().getFileSystem().mkdirs(this.location);
+        getTabletServer().getFileSystem().mkdirs(location);
       }
       tableDirChecked = true;
     }
@@ -524,7 +528,7 @@ public class Tablet implements TabletCommitter {
     configObserver.propertiesChanged();
 
     if (!logEntries.isEmpty()) {
-      log.info("Starting Write-Ahead Log recovery for " + this.extent);
+      log.info("Starting Write-Ahead Log recovery for " + extent);
       final long[] count = new long[2];
       final CommitSession commitSession = getTabletMemory().getCommitSession();
       count[1] = Long.MIN_VALUE;
@@ -942,7 +946,7 @@ public class Tablet implements TabletCommitter {
         if (lastFlushID >= tableFlushID)
           return;
 
-        if (closing || closed || getTabletMemory().memoryReservedForMinC())
+        if (isClosing() || isClosed() || getTabletMemory().memoryReservedForMinC())
           return;
 
         if (getTabletMemory().getMemTable().getNumEntries() == 0) {
@@ -1022,15 +1026,14 @@ public class Tablet implements TabletCommitter {
       synchronized (this) {
         t1 = System.currentTimeMillis();
 
-        if (closing || closed || majorCompactionWaitingToStart || getTabletMemory().memoryReservedForMinC() || getTabletMemory().getMemTable().getNumEntries() == 0
+        if (isClosing() || isClosed() || majorCompactionState == CompactionState.WAITING_TO_START || getTabletMemory().memoryReservedForMinC() || getTabletMemory().getMemTable().getNumEntries() == 0
             || updatingFlushID) {
 
           logMessage = new StringBuilder();
 
           logMessage.append(extent.toString());
-          logMessage.append(" closing " + closing);
-          logMessage.append(" closed " + closed);
-          logMessage.append(" majorCompactionWaitingToStart " + majorCompactionWaitingToStart);
+          logMessage.append(" closeState " + closeState);
+          logMessage.append(" majorCompactionState " + majorCompactionState);
           if (getTabletMemory() != null)
             logMessage.append(" tabletMemory.memoryReservedForMinC() " + getTabletMemory().memoryReservedForMinC());
           if (getTabletMemory() != null && getTabletMemory().getMemTable() != null)
@@ -1145,8 +1148,7 @@ public class Tablet implements TabletCommitter {
       throw new IllegalStateException("waitingForLogs < 0 " + writesInProgress);
     }
 
-    if (closed || getTabletMemory() == null) {
-      // log.debug("tablet closed, can't commit");
+    if (isClosed() || getTabletMemory() == null) {
       return null;
     }
 
@@ -1217,7 +1219,7 @@ public class Tablet implements TabletCommitter {
       throw new IllegalStateException("waitingForLogs <= 0 " + writesInProgress);
     }
 
-    if (closeComplete || getTabletMemory() == null) {
+    if (isCloseComplete() || getTabletMemory() == null) {
       throw new IllegalStateException("aborting commit when tablet is closed");
     }
 
@@ -1245,7 +1247,7 @@ public class Tablet implements TabletCommitter {
         throw new IllegalStateException("commiting mutations after logging, but not waiting for any log messages");
       }
 
-      if (closed && closeComplete) {
+      if (isCloseComplete()) {
         throw new IllegalStateException("tablet closed with outstanding messages to the logger");
       }
 
@@ -1284,28 +1286,24 @@ public class Tablet implements TabletCommitter {
     MinorCompactionTask mct = null;
 
     synchronized (this) {
-      if (closed || closing || closeComplete) {
-        String msg = "Tablet " + getExtent() + " already";
-        if (closed)
-          msg += " closed";
-        if (closing)
-          msg += " closing";
-        if (closeComplete)
-          msg += " closeComplete";
+      if (isClosed() || isClosing() || isCloseComplete()) {
+        String msg = "Tablet " + getExtent() + " already " + closeState;
         throw new IllegalStateException(msg);
       }
 
       // enter the closing state, no splits, minor, or major compactions can start
       // should cause running major compactions to stop
-      closing = true;
+      closeState = CloseState.CLOSING;
       this.notifyAll();
 
       // determines if inserts and queries can still continue while minor compacting
-      closed = disableWrites;
+      if (disableWrites) {
+        closeState = CloseState.CLOSING;
+      }
 
       // wait for major compactions to finish, setting closing to
       // true should cause any running major compactions to abort
-      while (majorCompactionInProgress) {
+      while (majorCompactionRunning()) {
         try {
           this.wait(50);
         } catch (InterruptedException e) {
@@ -1349,9 +1347,8 @@ public class Tablet implements TabletCommitter {
 
   synchronized void completeClose(boolean saveState, boolean completeClose) throws IOException {
 
-    if (!closing || closeComplete || closeCompleting) {
-      throw new IllegalStateException("closing = " + closing + " closed = " + closed + " closeComplete = " + closeComplete + " closeCompleting = "
-          + closeCompleting);
+    if (!isClosing() || isCloseComplete() || closeCompleting) {
+      throw new IllegalStateException("closeState = " + closeState);
     }
 
     log.debug("completeClose(saveState=" + saveState + " completeClose=" + completeClose + ") " + getExtent());
@@ -1359,7 +1356,7 @@ public class Tablet implements TabletCommitter {
     // ensure this method is only called once, also guards against multiple
     // threads entering the method at the same time
     closeCompleting = true;
-    closed = true;
+    closeState = CloseState.CLOSED;
 
     // modify dataSourceDeletions so scans will try to switch data sources and fail because the tablet is closed
     dataSourceDeletions.incrementAndGet();
@@ -1422,7 +1419,8 @@ public class Tablet implements TabletCommitter {
     tableConfiguration.getNamespaceConfiguration().removeObserver(configObserver);
     tableConfiguration.removeObserver(configObserver);
 
-    closeComplete = completeClose;
+    if (completeClose)
+      closeState = CloseState.COMPLETE;
   }
 
   private void closeConsistencyCheck() {
@@ -1491,7 +1489,7 @@ public class Tablet implements TabletCommitter {
 
   public synchronized boolean initiateMajorCompaction(MajorCompactionReason reason) {
 
-    if (closing || closed || !needsMajorCompaction(reason) || majorCompactionInProgress || majorCompactionQueued.contains(reason)) {
+    if (isClosing() || isClosed() || !needsMajorCompaction(reason) || majorCompactionRunning() || majorCompactionQueued.contains(reason)) {
       return false;
     }
 
@@ -1507,7 +1505,7 @@ public class Tablet implements TabletCommitter {
    * 
    */
   public boolean needsMajorCompaction(MajorCompactionReason reason) {
-    if (majorCompactionInProgress)
+    if (majorCompactionRunning())
       return false;
     if (reason == MajorCompactionReason.CHOP || reason == MajorCompactionReason.USER)
       return true;
@@ -1678,7 +1676,7 @@ public class Tablet implements TabletCommitter {
   public synchronized boolean needsSplit() {
     boolean ret;
 
-    if (closing || closed)
+    if (isClosing() || isClosed())
       ret = false;
     else
       ret = findSplitRow(getDatafileManager().getFiles()) != null;
@@ -1689,7 +1687,7 @@ public class Tablet implements TabletCommitter {
   // BEGIN PRIVATE METHODS RELATED TO MAJOR COMPACTION
 
   private boolean isCompactionEnabled() {
-    return !closing && !getTabletServer().isMajorCompactionDisabled();
+    return !isClosing() && !getTabletServer().isMajorCompactionDisabled();
   }
 
   private CompactionStats _majorCompact(MajorCompactionReason reason) throws IOException, CompactionCanceledException {
@@ -1725,13 +1723,13 @@ public class Tablet implements TabletCommitter {
 
       t1 = System.currentTimeMillis();
 
-      majorCompactionWaitingToStart = true;
+      majorCompactionState = CompactionState.WAITING_TO_START;
 
       getTabletMemory().waitForMinC();
 
       t2 = System.currentTimeMillis();
 
-      majorCompactionWaitingToStart = false;
+      majorCompactionState = null;
       notifyAll();
 
       VolumeManager fs = getTabletServer().getFileSystem();
@@ -1953,11 +1951,11 @@ public class Tablet implements TabletCommitter {
         // check that compaction is still needed - defer to splitting
         majorCompactionQueued.remove(reason);
 
-        if (closing || closed || !needsMajorCompaction(reason) || majorCompactionInProgress || needsSplit()) {
+        if (isClosing() || isClosed ()|| !needsMajorCompaction(reason) || majorCompactionRunning() || needsSplit()) {
           return null;
         }
 
-        majorCompactionInProgress = true;
+        majorCompactionState = CompactionState.WAITING_TO_START;
       }
 
       try {
@@ -1978,7 +1976,7 @@ public class Tablet implements TabletCommitter {
         // ensure we always reset boolean, even
         // when an exception is thrown
         synchronized (this) {
-          majorCompactionInProgress = false;
+          majorCompactionState = null;
           this.notifyAll();
         }
 
@@ -2033,27 +2031,27 @@ public class Tablet implements TabletCommitter {
   }
 
   public synchronized boolean isClosing() {
-    return closing;
+    return closeState == CloseState.CLOSING;
   }
 
   public synchronized boolean isClosed() {
-    return closed;
+    return closeState == CloseState.CLOSED;
   }
 
   public synchronized boolean isCloseComplete() {
-    return closeComplete;
+    return closeState == CloseState.COMPLETE;
   }
 
   public boolean majorCompactionRunning() {
-    return this.majorCompactionInProgress;
+    return majorCompactionState == CompactionState.IN_PROGRESS;
   }
 
   public boolean isMinorCompactionQueued() {
-    return minorCompactionWaitingToStart;
+    return minorCompactionState == CompactionState.WAITING_TO_START;
   }
 
   public boolean isMinorCompactionRunning() {
-    return minorCompactionInProgress;
+    return minorCompactionState == CompactionState.IN_PROGRESS; 
   }
 
   public boolean isMajorCompactionQueued() {
@@ -2104,11 +2102,11 @@ public class Tablet implements TabletCommitter {
 
       if (splitPoint == null || splitPoint.row == null) {
         log.info("had to abort split because splitRow was null");
-        closing = false;
+        closeState = CloseState.OPEN;
         return null;
       }
 
-      closed = true;
+      closeState = CloseState.CLOSING;
       completeClose(true, false);
 
       Text midRow = splitPoint.row;
@@ -2151,8 +2149,7 @@ public class Tablet implements TabletCommitter {
 
       log.debug(String.format("offline split time : %6.2f secs", (t2 - t1) / 1000.0));
 
-      closeComplete = true;
-
+      closeState = CloseState.COMPLETE;
       return newTablets;
     }
   }
@@ -2213,7 +2210,7 @@ public class Tablet implements TabletCommitter {
     // Don't do it if we spent too long waiting for the lock
     long now = System.currentTimeMillis();
     synchronized (this) {
-      if (closed) {
+      if (isClosed()) {
         throw new IOException("tablet " + extent + " is closed");
       }
 
@@ -2328,7 +2325,7 @@ public class Tablet implements TabletCommitter {
     try {
       synchronized (this) {
 
-        if (closed && closeComplete) {
+        if (isCloseComplete()) {
           throw new IllegalStateException("Can not update logs of closed tablet " + extent);
         }
 
@@ -2409,12 +2406,12 @@ public class Tablet implements TabletCommitter {
       if (lastCompactID >= compactionId)
         return;
 
-      if (closing || closed || majorCompactionQueued.contains(MajorCompactionReason.USER) || majorCompactionInProgress)
+      if (isClosing() || isClosed() || majorCompactionQueued.contains(MajorCompactionReason.USER) || majorCompactionRunning())
         return;
 
       if (getDatafileManager().getDatafileSizes().size() == 0) {
         // no files, so jsut update the metadata table
-        majorCompactionInProgress = true;
+        majorCompactionState = CompactionState.IN_PROGRESS;
         updateMetadata = true;
         lastCompactID = compactionId;
       } else
@@ -2428,7 +2425,7 @@ public class Tablet implements TabletCommitter {
         MetadataTableUtil.updateTabletCompactID(extent, compactionId, SystemCredentials.get(), getTabletServer().getLock());
       } finally {
         synchronized (this) {
-          majorCompactionInProgress = false;
+          majorCompactionState = null;
           this.notifyAll();
         }
       }
@@ -2535,20 +2532,19 @@ public class Tablet implements TabletCommitter {
   }
 
   public void minorCompactionWaitingToStart() {
-    minorCompactionWaitingToStart = true;
+    minorCompactionState = CompactionState.WAITING_TO_START;
   }
 
   public void minorCompactionStarted() {
-    minorCompactionWaitingToStart = false;
-    minorCompactionInProgress = true;
+    minorCompactionState = CompactionState.IN_PROGRESS;
   }
 
   public void minorCompactionComplete() {
-    minorCompactionInProgress = false;
+    minorCompactionState = null;
   }
 
   public boolean isMajorCompactionRunning() {
-    return majorCompactionInProgress;
+    return majorCompactionState == CompactionState.IN_PROGRESS;
   }
 
   public TabletStats getTabletStats() {
@@ -2558,6 +2554,4 @@ public class Tablet implements TabletCommitter {
   public AtomicLong getScannedCounter() {
     return scannedCount;
   }
-
-  
 }