You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/31 19:14:38 UTC
[bookkeeper] branch master updated: [MERGE YAHOO REPO] Support
DbLedgerStorage in LedgerCmd to get list of logger files for a given
ledgerId
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 5ed8768 [MERGE YAHOO REPO] Support DbLedgerStorage in LedgerCmd to get list of logger files for a given ledgerId
5ed8768 is described below
commit 5ed8768d9ab267111cf2eafcfd0b570b9ab9646c
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Wed Jan 31 11:14:31 2018 -0800
[MERGE YAHOO REPO] Support DbLedgerStorage in LedgerCmd to get list of logger files for a given ledgerId
Descriptions of the changes in this PR:
This is cherry-pick from yahoo repo of branch yahoo-4.3.
original change is:
https://github.com/yahoo/bookkeeper/commit/384414b6
Support DbLedgerStorage in LedgerCmd to get list of logger files for a given ledgerId
Author: Jia Zhai <zh...@apache.org>
Author: Rajan <dh...@gmail.com>
Reviewers: Sijie Guo <si...@apache.org>
This closes #1057 from jiazhai/cherry_picks/i_209
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 26 +++--
.../bookie/storage/ldb/DbLedgerStorage.java | 53 +++++++++-
.../apache/bookkeeper/client/LedgerCmdTest.java | 111 +++++++++++++++++++++
3 files changed, 180 insertions(+), 10 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 44fcaa9..477f66b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -674,12 +674,26 @@ public class BookieShell implements Tool {
printUsage();
return -1;
}
- if (printMeta) {
- // print meta
- readLedgerMeta(ledgerId);
+
+ if (bkConf.getLedgerStorageClass().equals(DbLedgerStorage.class.getName())) {
+ // dump ledger info
+ try {
+ DbLedgerStorage.readLedgerIndexEntries(ledgerId, bkConf,
+ (currentEntry, entryLogId, position) -> System.out.println(
+ "entry " + currentEntry + "\t:\t(log: " + entryLogId + ", pos: " + position + ")"));
+ } catch (IOException e) {
+ System.err.printf("ERROR: initializing dbLedgerStorage %s", e.getMessage());
+ return -1;
+ }
+ } else {
+ if (printMeta) {
+ // print meta
+ readLedgerMeta(ledgerId);
+ }
+ // dump ledger info
+ readLedgerIndexEntries(ledgerId);
}
- // dump ledger info
- readLedgerIndexEntries(ledgerId);
+
return 0;
}
@@ -3002,7 +3016,7 @@ public class BookieShell implements Tool {
}
/**
- * Read ledger index entires.
+ * Read ledger index entries.
*
* @param ledgerId Ledger Id
* @throws IOException
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index cb295d5..ec7a3b7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -21,14 +21,13 @@
package org.apache.bookkeeper.bookie.storage.ldb;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
-
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
-
import java.io.IOException;
import java.util.SortedMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -40,7 +39,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
import org.apache.bookkeeper.bookie.BookieException;
@@ -56,13 +54,16 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -773,7 +774,6 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
return numberOfEntries.get();
}
-
@Override
public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
ledgerDeletionListeners.add(listener);
@@ -791,5 +791,50 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
+ /**
+ * Reads ledger index entries to get list of entry-logger that contains given ledgerId.
+ *
+ * @param ledgerId
+ * @param serverConf
+ * @param processor
+ * @throws IOException
+ */
+ public static void readLedgerIndexEntries(long ledgerId, ServerConfiguration serverConf,
+ LedgerLoggerProcessor processor) throws IOException {
+
+ checkNotNull(serverConf, "ServerConfiguration can't be null");
+ checkNotNull(processor, "LedgerLoggger info processor can't null");
+
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(serverConf, serverConf.getLedgerDirs(),
+ new DiskChecker(serverConf.getDiskUsageThreshold(), serverConf.getDiskUsageWarnThreshold()));
+ String ledgerBasePath = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+
+ EntryLocationIndex entryLocationIndex = new EntryLocationIndex(serverConf,
+ (path, dbConfigType, conf1) -> new KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true),
+ ledgerBasePath, NullStatsLogger.INSTANCE);
+ try {
+ long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
+ for (long currentEntry = 0; currentEntry <= lastEntryId; currentEntry++) {
+ long offset = entryLocationIndex.getLocation(ledgerId, currentEntry);
+ if (offset <= 0) {
+ // entry not found in this bookie
+ continue;
+ }
+ long entryLogId = offset >> 32L;
+ long position = offset & 0xffffffffL;
+ processor.process(currentEntry, entryLogId, position);
+ }
+ } finally {
+ entryLocationIndex.close();
+ }
+ }
+
+ /**
+ * Interface which process ledger logger.
+ */
+ public interface LedgerLoggerProcessor {
+ void process(long entryId, long entryLogId, long position);
+ }
+
private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class);
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java
new file mode 100644
index 0000000..4055917
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static junit.framework.TestCase.assertEquals;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.bookie.BookieAccessor;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.EntryFormatter;
+import org.apache.bookkeeper.util.LedgerIdFormatter;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A test for bookieshell ledger command.
+ */
+public class LedgerCmdTest extends BookKeeperClusterTestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LedgerCmdTest.class);
+ private DigestType digestType = DigestType.CRC32;
+ private static final String PASSWORD = "testPasswd";
+
+ public LedgerCmdTest() {
+ super(1);
+ baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ baseConf.setGcWaitTime(60000);
+ baseConf.setFlushInterval(1);
+ }
+
+
+ /**
+ * list of entry logger files that contains given ledgerId.
+ */
+ @Test
+ public void testLedgerDbStorageCmd() throws Exception {
+
+ BookKeeper bk = new BookKeeper(baseClientConf, zkc);
+ LOG.info("Create ledger and add entries to it");
+ LedgerHandle lh1 = createLedgerWithEntries(bk, 10);
+
+ bs.forEach(bookieServer -> {
+ try {
+ BookieAccessor.forceFlush(bookieServer.getBookie());
+ } catch (IOException e) {
+ LOG.error("Error forceFlush:", e);
+ }
+ });
+
+ String[] argv = { "ledger", Long.toString(lh1.getId()) };
+ final ServerConfiguration conf = bsConfs.get(0);
+ conf.setUseHostNameAsBookieID(true);
+
+ BookieShell bkShell =
+ new BookieShell(LedgerIdFormatter.LONG_LEDGERID_FORMATTER, EntryFormatter.STRING_FORMATTER);
+ bkShell.setConf(conf);
+
+ assertEquals("Failed to return exit code!", 0, bkShell.run(argv));
+
+ }
+
+ private LedgerHandle createLedgerWithEntries(BookKeeper bk, int numOfEntries) throws Exception {
+ LedgerHandle lh = bk.createLedger(1, 1, digestType, PASSWORD.getBytes());
+ final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+ final CountDownLatch latch = new CountDownLatch(numOfEntries);
+
+ final AddCallback cb = new AddCallback() {
+ public void addComplete(int rccb, LedgerHandle lh, long entryId, Object ctx) {
+ rc.compareAndSet(BKException.Code.OK, rccb);
+ latch.countDown();
+ }
+ };
+ for (int i = 0; i < numOfEntries; i++) {
+ lh.asyncAddEntry(("foobar" + i).getBytes(), cb, null);
+ }
+ if (!latch.await(30, TimeUnit.SECONDS)) {
+ throw new Exception("Entries took too long to add");
+ }
+ if (rc.get() != BKException.Code.OK) {
+ throw BKException.create(rc.get());
+ }
+ return lh;
+ }
+}
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.