You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/02/16 19:31:44 UTC
[37/50] [abbrv] hadoop git commit: HDFS-4265. BKJM doesn't take
advantage of speculative reads. Contributed by Rakesh R.
HDFS-4265. BKJM doesn't take advantage of speculative reads. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08bc0c03
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08bc0c03
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08bc0c03
Branch: refs/heads/HDFS-7285
Commit: 08bc0c037b9b6761c84dfbb4e538b1b4e5f92bbc
Parents: 6384707
Author: Akira Ajisaka <aa...@apache.org>
Authored: Fri Feb 13 15:20:52 2015 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Feb 16 10:29:50 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../bkjournal/BookKeeperJournalManager.java | 32 +++-
.../TestBookKeeperSpeculativeRead.java | 167 +++++++++++++++++++
3 files changed, 195 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bc0c03/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 610d45c..1ec2bd2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -962,6 +962,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7776. Adding additional unit tests for Quota By Storage Type.
(Xiaoyu Yao via Arpit Agarwal)
+ HDFS-4625. BKJM doesn't take advantage of speculative reads. (Rakesh R
+ via aajisaka)
Release 2.6.1 - UNRELEASED
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bc0c03/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
index 227be6b..51905c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.VersionProto;
import com.google.protobuf.TextFormat;
import static com.google.common.base.Charsets.UTF_8;
+import org.apache.commons.io.Charsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
@@ -142,6 +143,15 @@ public class BookKeeperJournalManager implements JournalManager {
public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT
= "/ledgers/available";
+ public static final String BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS
+ = "dfs.namenode.bookkeeperjournal.speculativeReadTimeoutMs";
+ public static final int BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT
+ = 2000;
+
+ public static final String BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC
+ = "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
+ public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
+
private ZooKeeper zkc;
private final Configuration conf;
private final BookKeeper bkc;
@@ -153,6 +163,8 @@ public class BookKeeperJournalManager implements JournalManager {
private final int ensembleSize;
private final int quorumSize;
private final String digestpw;
+ private final int speculativeReadTimeout;
+ private final int readEntryTimeout;
private final CountDownLatch zkConnectLatch;
private final NamespaceInfo nsInfo;
private boolean initialized = false;
@@ -172,6 +184,11 @@ public class BookKeeperJournalManager implements JournalManager {
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
+ speculativeReadTimeout = conf.getInt(
+ BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
+ BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT);
+ readEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC,
+ BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT);
ledgerPath = basePath + "/ledgers";
String maxTxIdPath = basePath + "/maxtxid";
@@ -196,7 +213,10 @@ public class BookKeeperJournalManager implements JournalManager {
}
prepareBookKeeperEnv();
- bkc = new BookKeeper(new ClientConfiguration(), zkc);
+ ClientConfiguration clientConf = new ClientConfiguration();
+ clientConf.setSpeculativeReadTimeout(speculativeReadTimeout);
+ clientConf.setReadEntryTimeout(readEntryTimeout);
+ bkc = new BookKeeper(clientConf, zkc);
} catch (KeeperException e) {
throw new IOException("Error initializing zk", e);
} catch (InterruptedException ie) {
@@ -385,7 +405,7 @@ public class BookKeeperJournalManager implements JournalManager {
}
currentLedger = bkc.createLedger(ensembleSize, quorumSize,
BookKeeper.DigestType.MAC,
- digestpw.getBytes());
+ digestpw.getBytes(Charsets.UTF_8));
} catch (BKException bke) {
throw new IOException("Error creating ledger", bke);
} catch (KeeperException ke) {
@@ -522,10 +542,10 @@ public class BookKeeperJournalManager implements JournalManager {
LedgerHandle h;
if (l.isInProgress()) { // we don't want to fence the current journal
h = bkc.openLedgerNoRecovery(l.getLedgerId(),
- BookKeeper.DigestType.MAC, digestpw.getBytes());
+ BookKeeper.DigestType.MAC, digestpw.getBytes(Charsets.UTF_8));
} else {
h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
- digestpw.getBytes());
+ digestpw.getBytes(Charsets.UTF_8));
}
elis = new BookKeeperEditLogInputStream(h, l);
elis.skipTo(fromTxId);
@@ -732,11 +752,11 @@ public class BookKeeperJournalManager implements JournalManager {
if (fence) {
lh = bkc.openLedger(l.getLedgerId(),
BookKeeper.DigestType.MAC,
- digestpw.getBytes());
+ digestpw.getBytes(Charsets.UTF_8));
} else {
lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
BookKeeper.DigestType.MAC,
- digestpw.getBytes());
+ digestpw.getBytes(Charsets.UTF_8));
}
} catch (BKException bke) {
throw new IOException("Exception opening ledger for " + l, bke);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bc0c03/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
new file mode 100644
index 0000000..f5b86bc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperSpeculativeRead.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.contrib.bkjournal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBookKeeperSpeculativeRead {
+ private static final Log LOG = LogFactory
+ .getLog(TestBookKeeperSpeculativeRead.class);
+
+ private ZooKeeper zkc;
+ private static BKJMUtil bkutil;
+ private static int numLocalBookies = 1;
+ private static List<BookieServer> bks = new ArrayList<BookieServer>();
+
+ @BeforeClass
+ public static void setupBookkeeper() throws Exception {
+ bkutil = new BKJMUtil(1);
+ bkutil.start();
+ }
+
+ @AfterClass
+ public static void teardownBookkeeper() throws Exception {
+ bkutil.teardown();
+ for (BookieServer bk : bks) {
+ bk.shutdown();
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ zkc = BKJMUtil.connectZooKeeper();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ zkc.close();
+ }
+
+ private NamespaceInfo newNSInfo() {
+ Random r = new Random();
+ return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
+ }
+
+ /**
+ * Test speculative read feature supported by bookkeeper. Keep one bookie
+ * alive and sleep all the other bookies. Non spec client will hang for long
+ * time to read the entries from the bookkeeper.
+ */
+ @Test(timeout = 120000)
+ public void testSpeculativeRead() throws Exception {
+ // starting 9 more servers
+ for (int i = 1; i < 10; i++) {
+ bks.add(bkutil.newBookie());
+ }
+ NamespaceInfo nsi = newNSInfo();
+ Configuration conf = new Configuration();
+ int ensembleSize = numLocalBookies + 9;
+ conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+ ensembleSize);
+ conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
+ ensembleSize);
+ conf.setInt(
+ BookKeeperJournalManager.BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
+ 100);
+ // sets 60 minute
+ conf.setInt(
+ BookKeeperJournalManager.BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, 3600);
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ BKJMUtil.createJournalURI("/hdfsjournal-specread"), nsi);
+ bkjm.format(nsi);
+
+ final long numTransactions = 1000;
+ EditLogOutputStream out = bkjm.startLogSegment(1,
+ NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+ for (long i = 1; i <= numTransactions; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(1, numTransactions);
+
+ List<EditLogInputStream> in = new ArrayList<EditLogInputStream>();
+ bkjm.selectInputStreams(in, 1, true);
+
+ // sleep 9 bk servers. Now only one server is running and responding to the
+ // clients
+ CountDownLatch sleepLatch = new CountDownLatch(1);
+ for (final BookieServer bookie : bks) {
+ sleepBookie(sleepLatch, bookie);
+ }
+ try {
+ assertEquals(numTransactions,
+ FSEditLogTestUtil.countTransactionsInStream(in.get(0)));
+ } finally {
+ in.get(0).close();
+ sleepLatch.countDown();
+ bkjm.close();
+ }
+ }
+
+ /**
+ * Sleep a bookie until I count down the latch
+ *
+ * @param latch
+ * latch to wait on
+ * @param bookie
+ * bookie server
+ * @throws Exception
+ */
+ private void sleepBookie(final CountDownLatch latch, final BookieServer bookie)
+ throws Exception {
+
+ Thread sleeper = new Thread() {
+ public void run() {
+ try {
+ bookie.suspendProcessing();
+ latch.await(2, TimeUnit.MINUTES);
+ bookie.resumeProcessing();
+ } catch (Exception e) {
+ LOG.error("Error suspending bookie", e);
+ }
+ }
+ };
+ sleeper.setName("BookieServerSleeper-" + bookie.getBookie().getId());
+ sleeper.start();
+ }
+}