You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/01/09 23:23:50 UTC

[hbase] branch branch-2 updated (e78ce46 -> 58b0e0f)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a change to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git.


    from e78ce46  HBASE-23601: OutputSink.WriterThread exception gets stuck and repeated indefinietly (#956)
     new 1047246  HBASE-23668 Master log start filling with "Flush journal status" messages
     new 58b0e0f  Revert "HBASE-23601: OutputSink.WriterThread exception gets stuck and repeated indefinietly (#956)"

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../store/region/RegionFlusherAndCompactor.java    |  15 ++-
 .../store/region/RegionProcedureStore.java         |  14 ++-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   4 +-
 .../RegionReplicaReplicationEndpoint.java          |   1 -
 .../org/apache/hadoop/hbase/wal/OutputSink.java    |  20 +---
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   |   1 -
 .../hadoop/hbase/wal/TestOutputSinkWriter.java     | 126 ---------------------
 7 files changed, 24 insertions(+), 157 deletions(-)
 delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestOutputSinkWriter.java


[hbase] 02/02: Revert "HBASE-23601: OutputSink.WriterThread exception gets stuck and repeated indefinietly (#956)"

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 58b0e0f3fcfd7ca53fd12692b588ce1b3fe61057
Author: stack <st...@apache.org>
AuthorDate: Thu Jan 9 15:23:36 2020 -0800

    Revert "HBASE-23601: OutputSink.WriterThread exception gets stuck and repeated indefinietly (#956)"
    
    This reverts commit e78ce468d8e37df49151c16c39a9607258c3c096.
---
 .../RegionReplicaReplicationEndpoint.java          |   1 -
 .../org/apache/hadoop/hbase/wal/OutputSink.java    |  20 +---
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   |   1 -
 .../hadoop/hbase/wal/TestOutputSinkWriter.java     | 126 ---------------------
 4 files changed, 1 insertion(+), 147 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index cead808..60f693a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -249,7 +249,6 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
       } catch (IOException e) {
         LOG.warn("Received IOException while trying to replicate"
             + StringUtils.stringifyException(e));
-        outputSink.restartWriterThreadsIfNeeded();
       }
     }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
index de62c4d..b589134 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
@@ -89,19 +89,6 @@ public abstract class OutputSink {
     }
   }
 
-  public synchronized void restartWriterThreadsIfNeeded() {
-    for(int i = 0; i< writerThreads.size(); i++){
-      WriterThread t = writerThreads.get(i);
-      if (!t.isAlive()){
-        String threadName = t.getName();
-        LOG.debug("Replacing dead thread: " + threadName);
-        WriterThread newThread = new WriterThread(controller, entryBuffers, this, threadName);
-        newThread.start();
-        writerThreads.set(i, newThread);
-      }
-    }
-  }
-
   /**
    * Wait for writer threads to dump all info to the sink
    *
@@ -177,12 +164,7 @@ public abstract class OutputSink {
 
     WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
         OutputSink sink, int i) {
-      this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i);
-    }
-
-    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
-        OutputSink sink, String threadName) {
-      super(threadName);
+      super(Thread.currentThread().getName() + "-Writer-" + i);
       this.controller = controller;
       this.entryBuffers = entryBuffers;
       outputSink = sink;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 6b75f1d..d7bbd07 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -475,7 +475,6 @@ public class WALSplitter {
       if (thrown == null) {
         return;
       }
-      this.thrown.set(null);
       if (thrown instanceof IOException) {
         throw new IOException(thrown);
       } else {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestOutputSinkWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestOutputSinkWriter.java
deleted file mode 100644
index 5249835..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestOutputSinkWriter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ RegionServerTests.class, MediumTests.class })
-public class TestOutputSinkWriter {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(
-          TestOutputSinkWriter.class);
-
-  @Test
-  public void testExeptionHandling() throws IOException, InterruptedException {
-    WALSplitter.PipelineController controller = new WALSplitter.PipelineController();
-    BrokenEntryBuffers entryBuffers = new BrokenEntryBuffers(controller, 2000);
-    OutputSink sink = new OutputSink(controller, entryBuffers, 1) {
-
-      @Override public List<Path> finishWritingAndClose() throws IOException {
-        return null;
-      }
-
-      @Override public Map<byte[],Long> getOutputCounts() {
-        return null;
-      }
-
-      @Override public int getNumberOfRecoveredRegions() {
-        return 0;
-      }
-
-      @Override public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
-
-      }
-
-      @Override public boolean keepRegionEvent(WAL.Entry entry) {
-        return false;
-      }
-    };
-
-    //start the Writer thread and give it time trow the exception
-    sink.startWriterThreads();
-    Thread.sleep(1000L);
-
-    //make sure the exception is stored
-    try {
-      controller.checkForErrors();
-      Assert.fail();
-    }
-    catch (RuntimeException re){
-      Assert.assertTrue(true);
-    }
-
-    sink.restartWriterThreadsIfNeeded();
-
-    //after the check the stored exception should be gone
-    try {
-      controller.checkForErrors();
-    }
-    catch (RuntimeException re){
-      Assert.fail();
-    }
-
-    //prep another exception and wait for it to be thrown
-    entryBuffers.setThrowError(true);
-    Thread.sleep(1000L);
-
-    //make sure the exception is stored
-    try {
-      controller.checkForErrors();
-      Assert.fail();
-    }
-    catch (RuntimeException re){
-      Assert.assertTrue(true);
-    }
-  }
-
-  static class BrokenEntryBuffers extends EntryBuffers{
-    boolean throwError = true;
-
-    public BrokenEntryBuffers(WALSplitter.PipelineController controller, long maxHeapUsage) {
-      super(controller, maxHeapUsage);
-    }
-
-    @Override
-    synchronized WALSplitter.RegionEntryBuffer getChunkToWrite() {
-      //This just emulates something going wrong with in the Writer
-      if(throwError){
-        throwError = false;
-        throw new RuntimeException("testing");
-      }
-      return null;
-    }
-
-    public void setThrowError(boolean newValue){
-      throwError = newValue;
-    }
-  };
-}


[hbase] 01/02: HBASE-23668 Master log start filling with "Flush journal status" messages

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 1047246717c9d866401938264f3b2c523c3dfd83
Author: stack <st...@apache.org>
AuthorDate: Thu Jan 9 11:31:20 2020 -0800

    HBASE-23668 Master log start filling with "Flush journal status" messages
---
 .../store/region/RegionFlusherAndCompactor.java           | 15 +++++++++++++--
 .../procedure2/store/region/RegionProcedureStore.java     | 14 ++++++++------
 .../org/apache/hadoop/hbase/regionserver/HRegion.java     |  4 ++--
 3 files changed, 23 insertions(+), 10 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
index 53bf66b..5f2ff07 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
@@ -120,6 +120,8 @@ class RegionFlusherAndCompactor implements Closeable {
     flushThread.start();
     compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
       .setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
+    LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, " +
+        "compactMin=", flushSize, flushPerChanges, flushIntervalMs, compactMin);
   }
 
   // inject our flush related configurations
@@ -130,6 +132,8 @@ class RegionFlusherAndCompactor implements Closeable {
     conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
     long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
     conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
+    LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize,
+      flushPerChanges, flushIntervalMs);
   }
 
   private void compact() {
@@ -180,6 +184,7 @@ class RegionFlusherAndCompactor implements Closeable {
       changesAfterLastFlush.set(0);
       try {
         region.flush(true);
+        lastFlushTime = EnvironmentEdgeManager.currentTime();
       } catch (IOException e) {
         LOG.error(HBaseMarkers.FATAL, "Failed to flush procedure store region, aborting...", e);
         abortable.abort("Failed to flush procedure store region", e);
@@ -207,8 +212,14 @@ class RegionFlusherAndCompactor implements Closeable {
   }
 
   private boolean shouldFlush(long changes) {
-    return region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize() >= flushSize ||
+    boolean flush = region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize() >= flushSize ||
       changes > flushPerChanges;
+    if (flush && LOG.isTraceEnabled()) {
+      LOG.trace("shouldFlush memStoreSize={}, flushSize={}, changes={}, flushPerChanges={}",
+        region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize(), flushSize, changes,
+        flushPerChanges);
+    }
+    return flush;
   }
 
   void onUpdate() {
@@ -237,4 +248,4 @@ class RegionFlusherAndCompactor implements Closeable {
     flushThread.interrupt();
     compactExecutor.shutdown();
   }
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
index 05a5059..be543d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
@@ -306,7 +306,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
     if (!fs.exists(procWALDir)) {
       return;
     }
-    LOG.info("The old procedure wal directory {} exists, start migrating", procWALDir);
+    LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", procWALDir);
     WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
     store.start(numThreads);
     store.recoverLease();
@@ -347,7 +347,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
         }
       }
     });
-    LOG.info("The max pid is {}, and the max pid of all loaded procedures is {}",
+    LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}",
       maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
     // Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
     // anyway, let's do a check here.
@@ -358,12 +358,13 @@ public class RegionProcedureStore extends ProcedureStoreBase {
           PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
       }
     } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
-      LOG.warn("The max pid is less than the max pid of all loaded procedures");
+      LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
     }
     if (!fs.delete(procWALDir, true)) {
-      throw new IOException("Failed to delete the migrated proc wal directory " + procWALDir);
+      throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " +
+        procWALDir);
     }
-    LOG.info("Migration finished");
+    LOG.info("Migration of WALProcedureStore finished");
   }
 
   @Override
@@ -382,7 +383,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
     if (conf.get(USE_HSYNC_KEY) != null) {
       conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
     }
-    conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT, IntMath.ceilingPowerOfTwo(16 * numThreads));
+    conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT,
+      IntMath.ceilingPowerOfTwo(16 * numThreads));
 
     walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
     walRoller.start();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a55e2fe..dac034d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2401,7 +2401,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           flushesQueued.reset();
         }
 
-        status.markComplete("Flush successful");
+        status.markComplete("Flush successful " + fs.toString());
         return fs;
       } finally {
         synchronized (writestate) {
@@ -8871,4 +8871,4 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     }
   }
-}
\ No newline at end of file
+}