You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2019/09/24 19:51:48 UTC

[hadoop] branch branch-3.2 updated: HDFS-14655. [SBN Read] Namenode crashes if one of The JN is down. Contributed by Ayush Saxena.

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new a0db762  HDFS-14655. [SBN Read] Namenode crashes if one of The JN is down. Contributed by Ayush Saxena.
a0db762 is described below

commit a0db762206ae7ffd1560c0bdd9b3f11f0d062f3c
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Sep 25 01:16:30 2019 +0530

    HDFS-14655. [SBN Read] Namenode crashes if one of The JN is down. Contributed by Ayush Saxena.
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  5 +++-
 .../hdfs/qjournal/client/IPCLoggerChannel.java     | 14 +++++----
 .../hadoop/hdfs/qjournal/client/QuorumCall.java    | 18 ++++++++++++
 .../hdfs/qjournal/client/QuorumJournalManager.java |  2 ++
 .../src/main/resources/hdfs-default.xml            |  8 +++++
 .../hadoop/hdfs/qjournal/MiniJournalCluster.java   |  6 +++-
 .../qjournal/client/TestQuorumJournalManager.java  | 34 ++++++++++++++++++++--
 7 files changed, 78 insertions(+), 9 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index aed535b..428c537 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1123,6 +1123,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms";
   public static final String  DFS_QJOURNAL_HTTP_OPEN_TIMEOUT_KEY = "dfs.qjournal.http.open.timeout.ms";
   public static final String  DFS_QJOURNAL_HTTP_READ_TIMEOUT_KEY = "dfs.qjournal.http.read.timeout.ms";
+  public static final String DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_KEY =
+      "dfs.qjournal.parallel-read.num-threads";
   public static final int     DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
   public static final int     DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
   public static final int     DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
@@ -1133,7 +1135,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
   public static final int     DFS_QJOURNAL_HTTP_OPEN_TIMEOUT_DEFAULT = URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT;
   public static final int     DFS_QJOURNAL_HTTP_READ_TIMEOUT_DEFAULT = URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT;
-  
+  public static final int DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_DEFAULT = 5;
+
   public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = "dfs.namenode.max-num-blocks-to-log";
   public static final long   DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l;
   
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
index 3a882e5..d5ec5ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -54,6 +55,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.StopWatch;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -270,12 +272,14 @@ public class IPCLoggerChannel implements AsyncLogger {
    */
   @VisibleForTesting
   protected ExecutorService createParallelExecutor() {
-    return Executors.newCachedThreadPool(
-        new ThreadFactoryBuilder()
-            .setDaemon(true)
+    int numThreads =
+        conf.getInt(DFSConfigKeys.DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_KEY,
+            DFSConfigKeys.DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_DEFAULT);
+    return new HadoopThreadPoolExecutor(1, numThreads, 60L,
+        TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+        new ThreadFactoryBuilder().setDaemon(true)
             .setNameFormat("Logger channel (from parallel executor) to " + addr)
-            .setUncaughtExceptionHandler(
-                UncaughtExceptionHandlers.systemExit())
+            .setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit())
             .build());
   }
   
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
index ef32eb1..49d9993 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.qjournal.client;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeoutException;
@@ -64,6 +66,7 @@ class QuorumCall<KEY, RESULT> {
   private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
   private final StopWatch quorumStopWatch;
   private final Timer timer;
+  private final List<ListenableFuture<RESULT>> allCalls;
   
   static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
       Map<KEY, ? extends ListenableFuture<RESULT>> calls, Timer timer) {
@@ -71,6 +74,7 @@ class QuorumCall<KEY, RESULT> {
     for (final Entry<KEY, ? extends ListenableFuture<RESULT>> e : calls.entrySet()) {
       Preconditions.checkArgument(e.getValue() != null,
           "null future for key: " + e.getKey());
+      qr.addCall(e.getValue());
       Futures.addCallback(e.getValue(), new FutureCallback<RESULT>() {
         @Override
         public void onFailure(Throwable t) {
@@ -102,6 +106,11 @@ class QuorumCall<KEY, RESULT> {
     // Only instantiated from factory method above
     this.timer = timer;
     this.quorumStopWatch = new StopWatch(timer);
+    this.allCalls = new ArrayList<>();
+  }
+
+  private void addCall(ListenableFuture<RESULT> call) {
+    allCalls.add(call);
   }
 
   /**
@@ -212,6 +221,15 @@ class QuorumCall<KEY, RESULT> {
   }
 
   /**
+   * Cancel any outstanding calls.
+   */
+  void cancelCalls() {
+    for (ListenableFuture<RESULT> call : allCalls) {
+      call.cancel(true);
+    }
+  }
+
+  /**
    * Check if any of the responses came back with an AssertionError.
    * If so, it re-throws it, even if there was a quorum of responses.
    * This code only runs if assertions are enabled for this class,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index 674ca70..abc2d4c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -579,6 +579,8 @@ public class QuorumJournalManager implements JournalManager {
         LOG.debug(msg.toString());
       }
     }
+    // Cancel any outstanding calls to JN's.
+    q.cancelCalls();
 
     int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount :
         responseCounts.get(responseCounts.size() - loggers.getMajoritySize());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 5269bc7..66d504b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4788,6 +4788,14 @@
 </property>
 
 <property>
+  <name>dfs.qjournal.parallel-read.num-threads</name>
+  <value>5</value>
+  <description>
+    Number of threads per JN to be used for tailing edits.
+  </description>
+</property>
+
+<property>
   <name>dfs.quota.by.storage.type.enabled</name>
   <value>true</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
index b81b710..e3e862f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
@@ -196,7 +196,11 @@ public class MiniJournalCluster {
   public JournalNode getJournalNode(int i) {
     return nodes[i].node;
   }
-  
+
+  public String getJournalNodeIpcAddress(int i) {
+    return nodes[i].ipcAddr.toString();
+  }
+
   public void restartJournalNode(int i) throws InterruptedException, IOException {
     JNInfo info = nodes[i];
     JournalNode jn = info.node;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
index f3bb954..cd0216e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
@@ -62,7 +63,9 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Stubber;
 
@@ -87,11 +90,17 @@ public class TestQuorumJournalManager {
     GenericTestUtils.setLogLevel(ProtobufRpcEngine.LOG, Level.ALL);
   }
 
+  @Rule
+  public TestName name = new TestName();
+
   @Before
   public void setup() throws Exception {
     conf = new Configuration();
-    // Don't retry connections - it just slows down the tests.
-    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    if (!name.getMethodName().equals("testSelectThreadCounts")) {
+      // Don't retry connections - it just slows down the tests.
+      conf.setInt(
+          CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    }
     // Turn off IPC client caching to handle daemon restarts.
     conf.setInt(
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
@@ -1040,6 +1049,27 @@ public class TestQuorumJournalManager {
   }
 
   @Test
+  public void testSelectThreadCounts() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+    JournalNode jn0 = cluster.getJournalNode(0);
+    String ipcAddr = cluster.getJournalNodeIpcAddress(0);
+    jn0.stopAndJoin(0);
+    for (int i = 0; i < 1000; i++) {
+      qjm.selectInputStreams(new ArrayList<>(), 1, true, false);
+    }
+    String expectedName =
+        "Logger channel (from parallel executor) to " + ipcAddr;
+    long num = Thread.getAllStackTraces().keySet().stream()
+        .filter((t) -> t.getName().contains(expectedName)).count();
+    // The number of threads for the stopped jn shouldn't be more than the
+    // configured value.
+    assertTrue("Number of threads are : " + num,
+        num <= DFSConfigKeys.DFS_QJOURNAL_PARALLEL_READ_NUM_THREADS_DEFAULT);
+  }
+
+  @Test
   public void testSelectViaRpcTwoJNsError() throws Exception {
     EditLogOutputStream stm =
         qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org