You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/02/16 19:31:44 UTC

[37/50] [abbrv] hadoop git commit: HDFS-4265. BKJM doesn't take advantage of speculative reads. Contributed by Rakesh R.

HDFS-4265. BKJM doesn't take advantage of speculative reads. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08bc0c03
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08bc0c03
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08bc0c03

Branch: refs/heads/HDFS-7285
Commit: 08bc0c037b9b6761c84dfbb4e538b1b4e5f92bbc
Parents: 6384707
Author: Akira Ajisaka <aa...@apache.org>
Authored: Fri Feb 13 15:20:52 2015 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Feb 16 10:29:50 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../bkjournal/BookKeeperJournalManager.java     |  32 +++-
 .../TestBookKeeperSpeculativeRead.java          | 167 +++++++++++++++++++
 3 files changed, 195 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bc0c03/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 610d45c..1ec2bd2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -962,6 +962,8 @@ Release 2.7.0 - UNRELEASED
       HDFS-7776. Adding additional unit tests for Quota By Storage Type.
       (Xiaoyu Yao via Arpit Agarwal)
 
+      HDFS-4625. BKJM doesn't take advantage of speculative reads. (Rakesh R
+      via aajisaka)
 
 Release 2.6.1 - UNRELEASED
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bc0c03/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
index 227be6b..51905c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.VersionProto;
 import com.google.protobuf.TextFormat;
 import static com.google.common.base.Charsets.UTF_8;
 
+import org.apache.commons.io.Charsets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import com.google.common.annotations.VisibleForTesting;
@@ -142,6 +143,15 @@ public class BookKeeperJournalManager implements JournalManager {
   public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT
     = "/ledgers/available";
 
+  public static final String BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS
+    = "dfs.namenode.bookkeeperjournal.speculativeReadTimeoutMs";
+  public static final int BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT
+    = 2000;
+
+  public static final String BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC
+    = "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
+  public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
+
   private ZooKeeper zkc;
   private final Configuration conf;
   private final BookKeeper bkc;
@@ -153,6 +163,8 @@ public class BookKeeperJournalManager implements JournalManager {
   private final int ensembleSize;
   private final int quorumSize;
   private final String digestpw;
+  private final int speculativeReadTimeout;
+  private final int readEntryTimeout;
   private final CountDownLatch zkConnectLatch;
   private final NamespaceInfo nsInfo;
   private boolean initialized = false;
@@ -172,6 +184,11 @@ public class BookKeeperJournalManager implements JournalManager {
                                BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
     quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
                              BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
+    speculativeReadTimeout = conf.getInt(
+                             BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
+                             BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT);
+    readEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC,
+                             BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT);
 
     ledgerPath = basePath + "/ledgers";
     String maxTxIdPath = basePath + "/maxtxid";
@@ -196,7 +213,10 @@ public class BookKeeperJournalManager implements JournalManager {
       }
 
       prepareBookKeeperEnv();
-      bkc = new BookKeeper(new ClientConfiguration(), zkc);
+      ClientConfiguration clientConf = new ClientConfiguration();
+      clientConf.setSpeculativeReadTimeout(speculativeReadTimeout);
+      clientConf.setReadEntryTimeout(readEntryTimeout);
+      bkc = new BookKeeper(clientConf, zkc);
     } catch (KeeperException e) {
       throw new IOException("Error initializing zk", e);
     } catch (InterruptedException ie) {
@@ -385,7 +405,7 @@ public class BookKeeperJournalManager implements JournalManager {
       }
       currentLedger = bkc.createLedger(ensembleSize, quorumSize,
                                        BookKeeper.DigestType.MAC,
-                                       digestpw.getBytes());
+                                       digestpw.getBytes(Charsets.UTF_8));
     } catch (BKException bke) {
       throw new IOException("Error creating ledger", bke);
     } catch (KeeperException ke) {
@@ -522,10 +542,10 @@ public class BookKeeperJournalManager implements JournalManager {
           LedgerHandle h;
           if (l.isInProgress()) { // we don't want to fence the current journal
             h = bkc.openLedgerNoRecovery(l.getLedgerId(),
-                BookKeeper.DigestType.MAC, digestpw.getBytes());
+                BookKeeper.DigestType.MAC, digestpw.getBytes(Charsets.UTF_8));
           } else {
             h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
-                digestpw.getBytes());
+                digestpw.getBytes(Charsets.UTF_8));
           }
           elis = new BookKeeperEditLogInputStream(h, l);
           elis.skipTo(fromTxId);
@@ -732,11 +752,11 @@ public class BookKeeperJournalManager implements JournalManager {
       if (fence) {
         lh = bkc.openLedger(l.getLedgerId(),
                             BookKeeper.DigestType.MAC,
-                            digestpw.getBytes());
+                            digestpw.getBytes(Charsets.UTF_8));
       } else {
         lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
                                       BookKeeper.DigestType.MAC,
-                                      digestpw.getBytes());
+                                      digestpw.getBytes(Charsets.UTF_8));
       }
     } catch (BKException bke) {
       throw new IOException("Exception opening ledger for " + l, bke);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bc0c03/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
new file mode 100644
index 0000000..f5b86bc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBookKeeperSpeculativeRead {
+  private static final Log LOG = LogFactory
+      .getLog(TestBookKeeperSpeculativeRead.class);
+
+  private ZooKeeper zkc;
+  private static BKJMUtil bkutil;
+  private static int numLocalBookies = 1;
+  private static List<BookieServer> bks = new ArrayList<BookieServer>();
+
+  @BeforeClass
+  public static void setupBookkeeper() throws Exception {
+    bkutil = new BKJMUtil(1);
+    bkutil.start();
+  }
+
+  @AfterClass
+  public static void teardownBookkeeper() throws Exception {
+    bkutil.teardown();
+    for (BookieServer bk : bks) {
+      bk.shutdown();
+    }
+  }
+
+  @Before
+  public void setup() throws Exception {
+    zkc = BKJMUtil.connectZooKeeper();
+  }
+
+  @After
+  public void teardown() throws Exception {
+    zkc.close();
+  }
+
+  private NamespaceInfo newNSInfo() {
+    Random r = new Random();
+    return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
+  }
+
+  /**
+   * Test speculative read feature supported by bookkeeper. Keep one bookie
+   * alive and sleep all the other bookies. Non spec client will hang for long
+   * time to read the entries from the bookkeeper.
+   */
+  @Test(timeout = 120000)
+  public void testSpeculativeRead() throws Exception {
+    // starting 9 more servers
+    for (int i = 1; i < 10; i++) {
+      bks.add(bkutil.newBookie());
+    }
+    NamespaceInfo nsi = newNSInfo();
+    Configuration conf = new Configuration();
+    int ensembleSize = numLocalBookies + 9;
+    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+        ensembleSize);
+    conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
+        ensembleSize);
+    conf.setInt(
+        BookKeeperJournalManager.BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
+        100);
+    // sets 60 minute
+    conf.setInt(
+        BookKeeperJournalManager.BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, 3600);
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+        BKJMUtil.createJournalURI("/hdfsjournal-specread"), nsi);
+    bkjm.format(nsi);
+
+    final long numTransactions = 1000;
+    EditLogOutputStream out = bkjm.startLogSegment(1,
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    for (long i = 1; i <= numTransactions; i++) {
+      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+      op.setTransactionId(i);
+      out.write(op);
+    }
+    out.close();
+    bkjm.finalizeLogSegment(1, numTransactions);
+
+    List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
+    bkjm.selectInputStreams(in, 1, true);
+
+    // sleep 9 bk servers. Now only one server is running and responding to the
+    // clients
+    CountDownLatch sleepLatch = new CountDownLatch(1);
+    for (final BookieServer bookie : bks) {
+      sleepBookie(sleepLatch, bookie);
+    }
+    try {
+      assertEquals(numTransactions,
+          FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
+    } finally {
+      in.get(0).close();
+      sleepLatch.countDown();
+      bkjm.close();
+    }
+  }
+
+  /**
+   * Sleep a bookie until I count down the latch
+   *
+   * @param latch
+   *          latch to wait on
+   * @param bookie
+   *          bookie server
+   * @throws Exception
+   */
+  private void sleepBookie(final CountDownLatch latch, final BookieServer bookie)
+      throws Exception {
+
+    Thread sleeper = new Thread() {
+      public void run() {
+        try {
+          bookie.suspendProcessing();
+          latch.await(2, TimeUnit.MINUTES);
+          bookie.resumeProcessing();
+        } catch (Exception e) {
+          LOG.error("Error suspending bookie", e);
+        }
+      }
+    };
+    sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
+    sleeper.start();
+  }
+}