You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/01/25 22:32:51 UTC

[GitHub] ivakegg closed pull request #356: ACCUMULO-4777 Removed the unused sequence generator.

ivakegg closed pull request #356: ACCUMULO-4777 Removed the unused sequence generator.
URL: https://github.com/apache/accumulo/pull/356
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 08232234db..26762fbd85 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -263,13 +263,13 @@
       "The maximum size for each write-ahead log. See comment for property tserver.memory.maps.max"),
   TSERV_WALOG_MAX_AGE("tserver.walog.max.age", "24h", PropertyType.TIMEDURATION, "The maximum age for each write-ahead log."),
   TSERV_WALOG_TOLERATED_CREATION_FAILURES("tserver.walog.tolerated.creation.failures", "50", PropertyType.COUNT,
-      "The maximum number of failures tolerated when creating a new WAL file within the period specified by tserver.walog.failures.period."
-          + " Exceeding this number of failures in the period causes the TabletServer to exit."),
+      "The maximum number of failures tolerated when creating a new WAL file.  Values < 0 will allow unlimited creation failures."
+          + " Exceeding this number of failures consecutively trying to create a new WAL causes the TabletServer to exit."),
   TSERV_WALOG_TOLERATED_WAIT_INCREMENT("tserver.walog.tolerated.wait.increment", "1000ms", PropertyType.TIMEDURATION,
-      "The amount of time to wait between failures to create a WALog."),
+      "The amount of time to wait between failures to create or write a WALog."),
   // Never wait longer than 5 mins for a retry
   TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION("tserver.walog.maximum.wait.duration", "5m", PropertyType.TIMEDURATION,
-      "The maximum amount of time to wait after a failure to create a WAL file."),
+      "The maximum amount of time to wait after a failure to create or write a WAL file."),
   TSERV_MAJC_DELAY("tserver.compaction.major.delay", "30s", PropertyType.TIMEDURATION,
       "Time a tablet server will sleep between checking which tablets need compaction."),
   TSERV_MAJC_THREAD_MAXOPEN("tserver.compaction.major.thread.files.open.max", "10", PropertyType.COUNT,
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
index e84b1af325..1f55d72fe6 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
@@ -25,12 +25,14 @@
 public class Retry {
   private static final Logger log = LoggerFactory.getLogger(Retry.class);
 
+  public static final long MAX_RETRY_DISABLED = -1;
+
   private long maxRetries, maxWait, waitIncrement;
   private long retriesDone, currentWait;
 
   /**
    * @param maxRetries
-   *          Maximum times to retry
+   *          Maximum times to retry or MAX_RETRY_DISABLED if no maximum
    * @param startWait
    *          The amount of time (ms) to wait for the initial retry
    * @param maxWait
@@ -46,6 +48,18 @@ public Retry(long maxRetries, long startWait, long waitIncrement, long maxWait)
     this.currentWait = startWait;
   }
 
+  /**
+   * @param startWait
+   *          The amount of time (ms) to wait for the initial retry
+   * @param maxWait
+   *          The maximum wait (ms)
+   * @param waitIncrement
+   *          The amount of time (ms) to increment next wait time by
+   */
+  public Retry(long startWait, long waitIncrement, long maxWait) {
+    this(MAX_RETRY_DISABLED, startWait, waitIncrement, maxWait);
+  }
+
   // Visible for testing
   long getMaxRetries() {
     return maxRetries;
@@ -86,8 +100,12 @@ void setMaxWait(long maxWait) {
     this.maxWait = maxWait;
   }
 
+  public boolean isMaxRetryDisabled() {
+    return maxRetries < 0;
+  }
+
   public boolean canRetry() {
-    return retriesDone < maxRetries;
+    return isMaxRetryDisabled() || (retriesDone < maxRetries);
   }
 
   public void useRetry() {
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
index 63a1241700..aa6da20fa8 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
@@ -25,6 +25,18 @@
 
   private final long maxRetries, startWait, maxWait, waitIncrement;
 
+  /**
+   * Create a retry factor for retries with a limit
+   *
+   * @param maxRetries
+   *          The maximum number of retries
+   * @param startWait
+   *          The wait ms for the first retry
+   * @param waitIncrement
+   *          The amount of ms to increment the wait on subsequent retries
+   * @param maxWait
+   *          The max amount of wait time between retries
+   */
   public RetryFactory(long maxRetries, long startWait, long waitIncrement, long maxWait) {
     this.maxRetries = maxRetries;
     this.startWait = startWait;
@@ -32,6 +44,23 @@ public RetryFactory(long maxRetries, long startWait, long waitIncrement, long ma
     this.waitIncrement = waitIncrement;
   }
 
+  /**
+   * Create a retry factory for retries that have no limit
+   *
+   * @param startWait
+   *          The wait ms for the first retry
+   * @param waitIncrement
+   *          The amount of ms to increment the wait on subsequent retries
+   * @param maxWait
+   *          The max amount of wait time between retries
+   */
+  public RetryFactory(long startWait, long waitIncrement, long maxWait) {
+    this.maxRetries = Retry.MAX_RETRY_DISABLED;
+    this.startWait = startWait;
+    this.maxWait = maxWait;
+    this.waitIncrement = waitIncrement;
+  }
+
   public Retry create() {
     return new Retry(maxRetries, startWait, waitIncrement, maxWait);
   }
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java
index cb3d608f1d..9ba19a4e67 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java
@@ -36,4 +36,16 @@ public void properArgumentsInRetry() {
     Assert.assertEquals(waitIncrement, retry.getWaitIncrement());
   }
 
+  @Test
+  public void properArgumentsInUnlimitedRetry() {
+    long startWait = 50l, maxWait = 5000l, waitIncrement = 500l;
+    RetryFactory factory = new RetryFactory(startWait, waitIncrement, maxWait);
+    Retry retry = factory.create();
+
+    Assert.assertEquals(Retry.MAX_RETRY_DISABLED, retry.getMaxRetries());
+    Assert.assertEquals(startWait, retry.getCurrentWait());
+    Assert.assertEquals(maxWait, retry.getMaxWait());
+    Assert.assertEquals(waitIncrement, retry.getWaitIncrement());
+  }
+
 }
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java
index e37af015e4..6bbd1ffb80 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java
@@ -28,10 +28,14 @@
 
   private Retry retry;
   long initialWait = 1000l, waitIncrement = 1000l, maxRetries = 5;
+  private Retry unlimitedRetry1;
+  private Retry unlimitedRetry2;
 
   @Before
   public void setup() {
     retry = new Retry(maxRetries, initialWait, waitIncrement, maxRetries * 1000l);
+    unlimitedRetry1 = new Retry(initialWait, waitIncrement, maxRetries * 1000l);
+    unlimitedRetry2 = new Retry(-10, initialWait, waitIncrement, maxRetries * 1000l);
   }
 
   @Test
@@ -124,4 +128,23 @@ public void testBoundedWaitIncrement() throws InterruptedException {
 
     EasyMock.verify(retry);
   }
+
+  @Test
+  public void testIsMaxRetryDisabled() {
+    Assert.assertFalse(retry.isMaxRetryDisabled());
+    Assert.assertTrue(unlimitedRetry1.isMaxRetryDisabled());
+    Assert.assertTrue(unlimitedRetry2.isMaxRetryDisabled());
+    Assert.assertEquals(Retry.MAX_RETRY_DISABLED, unlimitedRetry1.getMaxRetries());
+    Assert.assertEquals(-10, unlimitedRetry2.getMaxRetries());
+  }
+
+  @Test
+  public void testUnlimitedRetry() {
+    for (int i = 0; i < Integer.MAX_VALUE; i++) {
+      Assert.assertTrue(unlimitedRetry1.canRetry());
+      unlimitedRetry1.useRetry();
+      Assert.assertTrue(unlimitedRetry2.canRetry());
+      unlimitedRetry2.useRetry();
+    }
+  }
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 0f28601fb4..3f56def9f7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -372,14 +372,16 @@ public void run() {
           + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
 
     final long toleratedWalCreationFailures = aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES);
-    final long walCreationFailureRetryIncrement = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT);
-    final long walCreationFailureRetryMax = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION);
-    // Tolerate `toleratedWalCreationFailures` failures, waiting `walCreationFailureRetryIncrement` milliseconds after the first failure,
-    // incrementing the next wait period by the same value, for a maximum of `walCreationFailureRetryMax` retries.
-    final RetryFactory walCreationRetryFactory = new RetryFactory(toleratedWalCreationFailures, walCreationFailureRetryIncrement,
-        walCreationFailureRetryIncrement, walCreationFailureRetryMax);
-
-    logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory, walogMaxAge);
+    final long walFailureRetryIncrement = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT);
+    final long walFailureRetryMax = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION);
+    // Tolerate `toleratedWalCreationFailures` failures, waiting `walFailureRetryIncrement` milliseconds after the first failure,
+    // incrementing the next wait period by the same value, for a maximum of `walFailureRetryMax` retries.
+    final RetryFactory walCreationRetryFactory = new RetryFactory(toleratedWalCreationFailures, walFailureRetryIncrement, walFailureRetryIncrement,
+        walFailureRetryMax);
+    // Tolerate infinite failures for the write, however backing off the same as for creation failures.
+    final RetryFactory walWritingRetryFactory = new RetryFactory(walFailureRetryIncrement, walFailureRetryIncrement, walFailureRetryMax);
+
+    logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory, walWritingRetryFactory, walogMaxAge);
     this.resourceManager = new TabletServerResourceManager(this, fs);
     this.security = AuditedSecurityOperation.getInstance(this);
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index c1fd6bb43f..da3ef0b23d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -16,8 +16,6 @@
  */
 package org.apache.accumulo.tserver.log;
 
-import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -95,15 +93,15 @@
   // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them
   private final ReentrantReadWriteLock logIdLock = new ReentrantReadWriteLock();
 
-  private final AtomicInteger seqGen = new AtomicInteger();
-
   private final AtomicLong syncCounter;
   private final AtomicLong flushCounter;
 
   private long createTime = 0;
 
-  private final RetryFactory retryFactory;
-  private Retry retry = null;
+  private final RetryFactory createRetryFactory;
+  private Retry createRetry = null;
+
+  private final RetryFactory writeRetryFactory;
 
   static private abstract class TestCallWithWriteLock {
     abstract boolean test();
@@ -148,13 +146,15 @@ private static void testLockAndRun(final ReadWriteLock rwlock, TestCallWithWrite
     }
   }
 
-  public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, AtomicLong flushCounter, RetryFactory retryFactory, long maxAge) {
+  public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, AtomicLong flushCounter, RetryFactory createRetryFactory,
+      RetryFactory writeRetryFactory, long maxAge) {
     this.tserver = tserver;
     this.maxSize = maxSize;
     this.syncCounter = syncCounter;
     this.flushCounter = flushCounter;
-    this.retryFactory = retryFactory;
-    this.retry = null;
+    this.createRetryFactory = createRetryFactory;
+    this.createRetry = null;
+    this.writeRetryFactory = writeRetryFactory;
     this.maxAge = maxAge;
   }
 
@@ -224,8 +224,8 @@ synchronized private void createLogger() throws IOException {
         log.info("Using next log " + currentLog.getFileName());
 
         // When we successfully create a WAL, make sure to reset the Retry.
-        if (null != retry) {
-          retry = null;
+        if (null != createRetry) {
+          createRetry = null;
         }
 
         this.createTime = System.currentTimeMillis();
@@ -234,18 +234,18 @@ synchronized private void createLogger() throws IOException {
         throw new RuntimeException("Error: unexpected type seen: " + next);
       }
     } catch (Exception t) {
-      if (null == retry) {
-        retry = retryFactory.create();
+      if (null == createRetry) {
+        createRetry = createRetryFactory.create();
       }
 
       // We have more retries or we exceeded the maximum number of accepted failures
-      if (retry.canRetry()) {
-        // Use the retry and record the time in which we did so
-        retry.useRetry();
+      if (createRetry.canRetry()) {
+        // Use the createRetry and record the time in which we did so
+        createRetry.useRetry();
 
         try {
           // Backoff
-          retry.waitForNextAttempt();
+          createRetry.waitForNextAttempt();
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           throw new RuntimeException(e);
@@ -348,20 +348,26 @@ synchronized private void close() throws IOException {
   }
 
   interface Writer {
-    LoggerOperation write(DfsLogger logger, int seq) throws Exception;
+    LoggerOperation write(DfsLogger logger) throws Exception;
   }
 
-  private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
+  private void write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
+    write(commitSession, mincFinish, writer, writeRetryFactory.create());
+  }
+
+  private void write(CommitSession commitSession, boolean mincFinish, Writer writer, Retry writeRetry) throws IOException {
     List<CommitSession> sessions = Collections.singletonList(commitSession);
-    return write(sessions, mincFinish, writer);
+    write(sessions, mincFinish, writer, writeRetry);
   }
 
-  private int write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException {
+  private void write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException {
+    write(sessions, mincFinish, writer, writeRetryFactory.create());
+  }
+
+  private void write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer, Retry writeRetry) throws IOException {
     // Work very hard not to lock this during calls to the outside world
     int currentLogId = logId.get();
 
-    int seq = -1;
-    int attempt = 1;
     boolean success = false;
     while (!success) {
       try {
@@ -379,7 +385,7 @@ private int write(final Collection<CommitSession> sessions, boolean mincFinish,
             if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
               try {
                 // Scribble out a tablet definition and then write to the metadata table
-                defineTablet(commitSession);
+                defineTablet(commitSession, writeRetry);
               } finally {
                 commitSession.finishUpdatingLogsUsed();
               }
@@ -400,24 +406,26 @@ private int write(final Collection<CommitSession> sessions, boolean mincFinish,
         if (currentLogId == logId.get()) {
 
           // write the mutation to the logs
-          seq = seqGen.incrementAndGet();
-          if (seq < 0)
-            throw new RuntimeException("Logger sequence generator wrapped!  Onos!!!11!eleven");
-          LoggerOperation lop = writer.write(copy, seq);
+          LoggerOperation lop = writer.write(copy);
           lop.await();
 
           // double-check: did the log set change?
           success = (currentLogId == logId.get());
         }
       } catch (DfsLogger.LogClosedException ex) {
-        log.debug("Logs closed while writing, retrying " + attempt);
+        log.debug("Logs closed while writing, retrying attempt " + writeRetry.retriesCompleted());
       } catch (Exception t) {
-        if (attempt != 1) {
-          log.error("Unexpected error writing to log, retrying attempt " + attempt, t);
+        log.warn("Failed to write to WAL, retrying attempt " + writeRetry.retriesCompleted(), t);
+
+        try {
+          // Backoff
+          writeRetry.waitForNextAttempt();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
         }
-        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
       } finally {
-        attempt++;
+        writeRetry.useRetry();
       }
       // Some sort of write failure occurred. Grab the write lock and reset the logs.
       // But since multiple threads will attempt it, only attempt the reset when
@@ -453,42 +461,40 @@ void withWriteLock() throws IOException {
         closeForReplication(sessions);
       }
     });
-    return seq;
   }
 
   protected void closeForReplication(Collection<CommitSession> sessions) {
     // TODO We can close the WAL here for replication purposes
   }
 
-  public int defineTablet(final CommitSession commitSession) throws IOException {
+  public void defineTablet(final CommitSession commitSession, final Retry writeRetry) throws IOException {
     // scribble this into the metadata tablet, too.
-    return write(commitSession, false, new Writer() {
+    write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger) throws Exception {
         logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
         return DfsLogger.NO_WAIT_LOGGER_OP;
       }
-    });
+    }, writeRetry);
   }
 
-  public int log(final CommitSession commitSession, final long tabletSeq, final Mutation m, final Durability durability) throws IOException {
+  public void log(final CommitSession commitSession, final long tabletSeq, final Mutation m, final Durability durability) throws IOException {
     if (durability == Durability.NONE) {
-      return -1;
+      return;
     }
     if (durability == Durability.DEFAULT) {
       throw new IllegalArgumentException("Unexpected durability " + durability);
     }
-    int seq = write(commitSession, false, new Writer() {
+    write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger) throws Exception {
         return logger.log(tabletSeq, commitSession.getLogId(), m, durability);
       }
     });
     logSizeEstimate.addAndGet(m.numBytes());
-    return seq;
   }
 
-  public int logManyTablets(Map<CommitSession,Mutations> mutations) throws IOException {
+  public void logManyTablets(Map<CommitSession,Mutations> mutations) throws IOException {
 
     final Map<CommitSession,Mutations> loggables = new HashMap<>(mutations);
     for (Entry<CommitSession,Mutations> entry : mutations.entrySet()) {
@@ -497,11 +503,11 @@ public int logManyTablets(Map<CommitSession,Mutations> mutations) throws IOExcep
       }
     }
     if (loggables.size() == 0)
-      return -1;
+      return;
 
-    int seq = write(loggables.keySet(), false, new Writer() {
+    write(loggables.keySet(), false, new Writer() {
       @Override
-      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger) throws Exception {
         List<TabletMutations> copy = new ArrayList<>(loggables.size());
         for (Entry<CommitSession,Mutations> entry : loggables.entrySet()) {
           CommitSession cs = entry.getKey();
@@ -519,7 +525,6 @@ public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         logSizeEstimate.addAndGet(m.numBytes());
       }
     }
-    return seq;
   }
 
   public void minorCompactionFinished(final CommitSession commitSession, final String fullyQualifiedFileName, final long walogSeq, final Durability durability)
@@ -527,23 +532,23 @@ public void minorCompactionFinished(final CommitSession commitSession, final Str
 
     long t1 = System.currentTimeMillis();
 
-    int seq = write(commitSession, true, new Writer() {
+    write(commitSession, true, new Writer() {
       @Override
-      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger) throws Exception {
         return logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName, durability);
       }
     });
 
     long t2 = System.currentTimeMillis();
 
-    log.debug(" wrote MinC finish  {}: writeTime:{}ms  durability:{}", seq, (t2 - t1), durability);
+    log.debug(" wrote MinC finish: writeTime:{}ms  durability:{}", (t2 - t1), durability);
   }
 
   public long minorCompactionStarted(final CommitSession commitSession, final long seq, final String fullyQualifiedFileName, final Durability durability)
       throws IOException {
     write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(DfsLogger logger) throws Exception {
         return logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName, durability);
       }
     });


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services