You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/12 11:20:41 UTC
svn commit: r1420607 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ bookkeeper-s...
Author: ivank
Date: Wed Dec 12 10:20:38 2012
New Revision: 1420607
URL: http://svn.apache.org/viewvc?rev=1420607&view=rev
Log:
BOOKKEEPER-205: implement a MetaStore based ledger manager for bookkeeper client. (jiannan via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Dec 12 10:20:38 2012
@@ -238,6 +238,8 @@ Trunk (unreleased changes)
BOOKKEEPER-469: Remove System.out.println from TestLedgerManager (ivank via fpj)
+ BOOKKEEPER-205: implement a MetaStore based ledger manager for bookkeeper client. (jiannan via ivank)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java Wed Dec 12 10:20:38 2012
@@ -62,6 +62,8 @@ public abstract class BKException extend
return new BKBookieHandleNotAvailableException();
case Code.ZKException:
return new ZKException();
+ case Code.MetaStoreException:
+ return new MetaStoreException();
case Code.LedgerRecoveryException:
return new BKLedgerRecoveryException();
case Code.LedgerClosedException:
@@ -114,6 +116,7 @@ public abstract class BKException extend
int InterruptedException = -15;
int ProtocolVersionException = -16;
int MetadataVersionException = -17;
+ int MetaStoreException = -18;
int IllegalOpException = -100;
int LedgerFencedException = -101;
@@ -152,6 +155,8 @@ public abstract class BKException extend
return "Bookie handle is not available";
case Code.ZKException:
return "Error while using ZooKeeper";
+ case Code.MetaStoreException:
+ return "Error while using MetaStore";
case Code.LedgerRecoveryException:
return "Error while recovering ledger";
case Code.LedgerClosedException:
@@ -265,6 +270,12 @@ public abstract class BKException extend
}
}
+ public static class MetaStoreException extends BKException {
+ public MetaStoreException() {
+ super(Code.MetaStoreException);
+ }
+ }
+
public static class BKLedgerRecoveryException extends BKException {
public BKLedgerRecoveryException() {
super(Code.LedgerRecoveryException);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Wed Dec 12 10:20:38 2012
@@ -53,6 +53,10 @@ public abstract class AbstractConfigurat
protected final static String AVAILABLE_NODE = "available";
protected final static String REREPLICATION_ENTRY_BATCH_SIZE = "rereplicationEntryBatchSize";
+ // Metastore settings, only being used when LEDGER_MANAGER_FACTORY_CLASS is MSLedgerManagerFactory
+ protected final static String METASTORE_IMPL_CLASS = "metastoreImplClass";
+ protected final static String METASTORE_MAX_ENTRIES_PER_SCAN = "metastoreMaxEntriesPerScan";
+
protected AbstractConfiguration() {
super();
// add configuration for system properties
@@ -191,4 +195,42 @@ public abstract class AbstractConfigurat
public long getRereplicationEntryBatchSize() {
return getLong(REREPLICATION_ENTRY_BATCH_SIZE, 10);
}
+
+ /**
+ * Get metastore implementation class.
+ *
+ * @return metastore implementation class name.
+ */
+ public String getMetastoreImplClass() {
+ return getString(METASTORE_IMPL_CLASS);
+ }
+
+ /**
+ * Set metastore implementation class.
+ *
+ * @param metastoreImplClass
+ * Metastore implementation Class name.
+ */
+ public void setMetastoreImplClass(String metastoreImplClass) {
+ setProperty(METASTORE_IMPL_CLASS, metastoreImplClass);
+ }
+
+ /**
+ * Get max entries per scan in metastore.
+ *
+ * @return max entries per scan in metastore.
+ */
+ public int getMetastoreMaxEntriesPerScan() {
+ return getInt(METASTORE_MAX_ENTRIES_PER_SCAN, 50);
+ }
+
+ /**
+ * Set max entries per scan in metastore.
+ *
+ * @param maxEntries
+ * Max entries per scan in metastore.
+ */
+ public void setMetastoreMaxEntriesPerScan(int maxEntries) {
+ setProperty(METASTORE_MAX_ENTRIES_PER_SCAN, maxEntries);
+ }
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java?rev=1420607&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java Wed Dec 12 10:20:38 2012
@@ -0,0 +1,691 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import static org.apache.bookkeeper.metastore.MetastoreScannableTable.EMPTY_END_KEY;
+import static org.apache.bookkeeper.metastore.MetastoreTable.ALL_FIELDS;
+import static org.apache.bookkeeper.metastore.MetastoreTable.NON_FIELDS;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.metastore.MSException;
+import org.apache.bookkeeper.metastore.MetaStore;
+import org.apache.bookkeeper.metastore.MetastoreCallback;
+import org.apache.bookkeeper.metastore.MetastoreCursor;
+import org.apache.bookkeeper.metastore.MetastoreCursor.ReadEntriesCallback;
+import org.apache.bookkeeper.metastore.MetastoreException;
+import org.apache.bookkeeper.metastore.MetastoreFactory;
+import org.apache.bookkeeper.metastore.MetastoreScannableTable;
+import org.apache.bookkeeper.metastore.MetastoreScannableTable.Order;
+import org.apache.bookkeeper.metastore.MetastoreTableItem;
+import org.apache.bookkeeper.metastore.Value;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MetaStore Based Ledger Manager Factory
+ */
+public class MSLedgerManagerFactory extends LedgerManagerFactory {
+
+ static Logger LOG = LoggerFactory.getLogger(MSLedgerManagerFactory.class);
+
+ public static final int CUR_VERSION = 1;
+
+ public static final String TABLE_NAME = "LEDGER";
+ public static final String META_FIELD = ".META";
+
+ AbstractConfiguration conf;
+ ZooKeeper zk;
+ MetaStore metastore;
+
+ @Override
+ public int getCurrentVersion() {
+ return CUR_VERSION;
+ }
+
+ @Override
+ public LedgerManagerFactory initialize(final AbstractConfiguration conf, final ZooKeeper zk,
+ final int factoryVersion) throws IOException {
+ if (CUR_VERSION != factoryVersion) {
+ throw new IOException("Incompatible layout version found : " + factoryVersion);
+ }
+ this.conf = conf;
+ this.zk = zk;
+
+ // load metadata store
+ String msName = conf.getMetastoreImplClass();
+ try {
+ metastore = MetastoreFactory.createMetaStore(msName);
+
+ // TODO: should record version in somewhere. e.g. ZooKeeper
+ int msVersion = metastore.getVersion();
+ metastore.init(conf, msVersion);
+ } catch (Throwable t) {
+ throw new IOException("Failed to initialize metastore " + msName + " : ", t);
+ }
+
+ return this;
+ }
+
+ @Override
+ public void uninitialize() throws IOException {
+ metastore.close();
+ }
+
+ static Long key2LedgerId(String key) {
+ return null == key ? null : Long.parseLong(key, 10);
+ }
+
+ static String ledgerId2Key(Long lid) {
+ return null == lid ? null : StringUtils.getZKStringId(lid);
+ }
+
+ static String rangeToString(Long firstLedger, boolean firstInclusive, Long lastLedger, boolean lastInclusive) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(firstInclusive ? "[ " : "( ").append(firstLedger).append(" ~ ").append(lastLedger)
+ .append(lastInclusive ? " ]" : " )");
+ return sb.toString();
+ }
+
+ static SortedSet<Long> entries2Ledgers(Iterator<MetastoreTableItem> entries) {
+ SortedSet<Long> ledgers = new TreeSet<Long>();
+ while (entries.hasNext()) {
+ MetastoreTableItem item = entries.next();
+ try {
+ ledgers.add(key2LedgerId(item.getKey()));
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Found invalid ledger key {}", item.getKey());
+ }
+ }
+ return ledgers;
+ }
+
+ static class SyncResult<T> {
+ T value;
+ int rc;
+ boolean finished = false;
+
+ public synchronized void complete(int rc, T value) {
+ this.rc = rc;
+ this.value = value;
+ finished = true;
+
+ notify();
+ }
+
+ public synchronized void block() {
+ try {
+ while (!finished) {
+ wait();
+ }
+ } catch (InterruptedException ie) {
+ }
+ }
+
+ public synchronized int getRetCode() {
+ return rc;
+ }
+
+ public synchronized T getResult() {
+ return value;
+ }
+ }
+
+ static abstract class AbstractMsLedgerManager implements Closeable {
+
+ final ZooKeeper zk;
+ final AbstractConfiguration conf;
+
+ final MetaStore metastore;
+ final MetastoreScannableTable ledgerTable;
+ final int maxEntriesPerScan;
+
+ AbstractMsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
+ this.conf = conf;
+ this.zk = zk;
+ this.metastore = metastore;
+
+ try {
+ ledgerTable = metastore.createScannableTable(TABLE_NAME);
+ } catch (MetastoreException mse) {
+ LOG.error("Failed to instantiate table " + TABLE_NAME + " in metastore " + metastore.getName());
+ throw new RuntimeException("Failed to instantiate table " + TABLE_NAME + " in metastore "
+ + metastore.getName());
+ }
+ // configuration settings
+ maxEntriesPerScan = conf.getMetastoreMaxEntriesPerScan();
+ }
+
+ @Override
+ public void close() {
+ ledgerTable.close();
+ }
+
+ }
+
+ static class MsLedgerManager extends AbstractMsLedgerManager implements LedgerManager {
+
+ static final String IDGEN_ZNODE = "ms-idgen";
+ static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
+
+ // Path to generate global id
+ private final String idGenPath;
+
+ // we use this to prevent long stack chains from building up in
+ // callbacks
+ ScheduledExecutorService scheduler;
+
+ MsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
+ super(conf, zk, metastore);
+ this.idGenPath = conf.getZkLedgersRootPath() + IDGENERATION_PREFIX;
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @Override
+ public void close() {
+ try {
+ scheduler.shutdown();
+ } catch (Exception e) {
+ LOG.warn("Error when closing MsLedgerManager : ", e);
+ }
+ super.close();
+ }
+
+ @Override
+ public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) {
+ ZkUtils.createFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, final String idPathName) {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ LOG.error("Could not generate new ledger id",
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ ledgerCb.operationComplete(BKException.Code.ZKException, null);
+ return;
+ }
+ /*
+ * Extract ledger id from gen path
+ */
+ long ledgerId;
+ try {
+ ledgerId = getLedgerIdFromGenPath(idPathName);
+ } catch (IOException e) {
+ LOG.error("Could not extract ledger-id from id gen path:" + path, e);
+ ledgerCb.operationComplete(BKException.Code.ZKException, null);
+ return;
+ }
+
+ final long lid = ledgerId;
+ MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() {
+ @Override
+ public void complete(int rc, Version version, Object ctx) {
+ if (MSException.Code.BadVersion.getCode() == rc) {
+ ledgerCb.operationComplete(BKException.Code.MetadataVersionException, null);
+ return;
+ }
+ if (MSException.Code.OK.getCode() != rc) {
+ ledgerCb.operationComplete(BKException.Code.MetaStoreException, null);
+ return;
+ }
+ LOG.debug("Create ledger {} with version {} successfuly.", new Object[] { lid,
+ version });
+ // update version
+ metadata.setVersion(version);
+ ledgerCb.operationComplete(BKException.Code.OK, lid);
+ }
+ };
+
+ ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, metadata.serialize()),
+ Version.NEW, msCallback, null);
+ zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ LOG.warn("Exception during deleting znode for id generation : ",
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ } else {
+ LOG.debug("Deleting znode for id generation : {}", idPathName);
+ }
+ }
+ }, null);
+ }
+ }, null);
+ }
+
+ // get ledger id from generation path
+ private long getLedgerIdFromGenPath(String nodeName) throws IOException {
+ long ledgerId;
+ try {
+ String parts[] = nodeName.split(IDGENERATION_PREFIX);
+ ledgerId = Long.parseLong(parts[parts.length - 1]);
+ } catch (NumberFormatException e) {
+ throw new IOException(e);
+ }
+ return ledgerId;
+ }
+
+ @Override
+ public void deleteLedger(final long ledgerId, final GenericCallback<Void> cb) {
+ MetastoreCallback<Void> msCallback = new MetastoreCallback<Void>() {
+ @Override
+ public void complete(int rc, Void value, Object ctx) {
+ int bkRc;
+ if (MSException.Code.NoKey.getCode() == rc) {
+ LOG.warn("Ledger entry does not exist in meta table: ledgerId={}", ledgerId);
+ bkRc = BKException.Code.NoSuchLedgerExistsException;
+ } else if (MSException.Code.OK.getCode() == rc) {
+ bkRc = BKException.Code.OK;
+ } else {
+ bkRc = BKException.Code.MetaStoreException;
+ }
+ cb.operationComplete(bkRc, (Void) null);
+ }
+ };
+ ledgerTable.remove(ledgerId2Key(ledgerId), Version.ANY, msCallback, null);
+ }
+
+ @Override
+ public void readLedgerMetadata(final long ledgerId, final GenericCallback<LedgerMetadata> readCb) {
+ final String key = ledgerId2Key(ledgerId);
+ MetastoreCallback<Versioned<Value>> msCallback = new MetastoreCallback<Versioned<Value>>() {
+ @Override
+ public void complete(int rc, Versioned<Value> value, Object ctx) {
+ if (MSException.Code.NoKey.getCode() == rc) {
+ LOG.error("No ledger metadata found for ledger " + ledgerId + " : ",
+ MSException.create(MSException.Code.get(rc), "No key " + key + " found."));
+ readCb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null);
+ return;
+ }
+ if (MSException.Code.OK.getCode() != rc) {
+ LOG.error("Could not read metadata for ledger " + ledgerId + " : ",
+ MSException.create(MSException.Code.get(rc), "Failed to get key " + key));
+ readCb.operationComplete(BKException.Code.MetaStoreException, null);
+ return;
+ }
+ LedgerMetadata metadata;
+ try {
+ metadata = LedgerMetadata
+ .parseConfig(value.getValue().getField(META_FIELD), value.getVersion());
+ } catch (IOException e) {
+ LOG.error("Could not parse ledger metadata for ledger " + ledgerId + " : ", e);
+ readCb.operationComplete(BKException.Code.MetaStoreException, null);
+ return;
+ }
+ readCb.operationComplete(BKException.Code.OK, metadata);
+ }
+ };
+ ledgerTable.get(key, msCallback, ALL_FIELDS);
+ }
+
+ @Override
+ public void writeLedgerMetadata(final long ledgerId, final LedgerMetadata metadata,
+ final GenericCallback<Void> cb) {
+ Value data = new Value().setField(META_FIELD, metadata.serialize());
+
+ LOG.debug("Writing ledger {} metadata, version {}", new Object[] { ledgerId, metadata.getVersion() });
+
+ final String key = ledgerId2Key(ledgerId);
+ MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() {
+ @Override
+ public void complete(int rc, Version version, Object ctx) {
+ int bkRc;
+ if (MSException.Code.BadVersion.getCode() == rc) {
+ LOG.info("Bad version provided to updat metadata for ledger {}", ledgerId);
+ bkRc = BKException.Code.MetadataVersionException;
+ } else if (MSException.Code.NoKey.getCode() == rc) {
+ LOG.warn("Ledger {} doesn't exist when writing its ledger metadata.", ledgerId);
+ bkRc = BKException.Code.NoSuchLedgerExistsException;
+ } else if (MSException.Code.OK.getCode() == rc) {
+ metadata.setVersion(version);
+ bkRc = BKException.Code.OK;
+ } else {
+ LOG.warn("Conditional update ledger metadata failed: ",
+ MSException.create(MSException.Code.get(rc), "Failed to put key " + key));
+ bkRc = BKException.Code.MetaStoreException;
+ }
+
+ cb.operationComplete(bkRc, null);
+ }
+ };
+ ledgerTable.put(key, data, metadata.getVersion(), msCallback, null);
+ }
+
+ @Override
+ public void asyncProcessLedgers(final Processor<Long> processor, final AsyncCallback.VoidCallback finalCb,
+ final Object context, final int successRc, final int failureRc) {
+ MetastoreCallback<MetastoreCursor> openCursorCb = new MetastoreCallback<MetastoreCursor>() {
+ @Override
+ public void complete(int rc, MetastoreCursor cursor, Object ctx) {
+ if (MSException.Code.OK.getCode() != rc) {
+ finalCb.processResult(failureRc, null, context);
+ return;
+ }
+ if (!cursor.hasMoreEntries()) {
+ finalCb.processResult(successRc, null, context);
+ return;
+ }
+ asyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
+ }
+ };
+ ledgerTable.openCursor(NON_FIELDS, openCursorCb, null);
+ }
+
+ void asyncProcessLedgers(final MetastoreCursor cursor, final Processor<Long> processor,
+ final AsyncCallback.VoidCallback finalCb, final Object context,
+ final int successRc, final int failureRc) {
+ scheduler.submit(new Runnable() {
+ @Override
+ public void run() {
+ doAsyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
+ }
+ });
+ }
+
+ void doAsyncProcessLedgers(final MetastoreCursor cursor, final Processor<Long> processor,
+ final AsyncCallback.VoidCallback finalCb, final Object context,
+ final int successRc, final int failureRc) {
+ // no entries now
+ if (!cursor.hasMoreEntries()) {
+ finalCb.processResult(successRc, null, context);
+ return;
+ }
+ ReadEntriesCallback msCallback = new ReadEntriesCallback() {
+ @Override
+ public void complete(int rc, Iterator<MetastoreTableItem> entries, Object ctx) {
+ if (MSException.Code.OK.getCode() != rc) {
+ finalCb.processResult(failureRc, null, context);
+ return;
+ }
+
+ SortedSet<Long> ledgers = new TreeSet<Long>();
+ while (entries.hasNext()) {
+ MetastoreTableItem item = entries.next();
+ try {
+ ledgers.add(key2LedgerId(item.getKey()));
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Found invalid ledger key {}", item.getKey());
+ }
+ }
+
+ if (0 == ledgers.size()) {
+ // process next batch of ledgers
+ asyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
+ return;
+ }
+
+ final long startLedger = ledgers.first();
+ final long endLedger = ledgers.last();
+
+ AsyncSetProcessor<Long> setProcessor = new AsyncSetProcessor<Long>(scheduler);
+ // process set
+ setProcessor.process(ledgers, processor, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (successRc != rc) {
+ LOG.error("Failed when processing range "
+ + rangeToString(startLedger, true, endLedger, true));
+ finalCb.processResult(failureRc, null, context);
+ return;
+ }
+ // process next batch of ledgers
+ asyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
+ }
+ }, context, successRc, failureRc);
+ }
+ };
+ cursor.asyncReadEntries(maxEntriesPerScan, msCallback, null);
+ }
+ }
+
+ static class MsActiveLedgerManager extends AbstractMsLedgerManager implements ActiveLedgerManager {
+
+ // A sorted map to stored all active ledger ids
+ protected final SnapshotMap<Long, Boolean> activeLedgers;
+
+ MsActiveLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
+ super(conf, zk, metastore);
+ activeLedgers = new SnapshotMap<Long, Boolean>();
+ }
+
+ @Override
+ public void addActiveLedger(long ledgerId, boolean active) {
+ activeLedgers.put(ledgerId, active);
+ }
+
+ @Override
+ public void removeActiveLedger(long ledgerId) {
+ activeLedgers.remove(ledgerId);
+ }
+
+ @Override
+ public boolean containsActiveLedger(long ledgerId) {
+ return activeLedgers.containsKey(ledgerId);
+ }
+
+ @Override
+ public void garbageCollectLedgers(GarbageCollector gc) {
+ LOG.debug("Start garbage collect ledgers.");
+ NavigableMap<Long, Boolean> snapshot = activeLedgers.snapshot();
+ Long nextLedger = 0L;
+ int numRetries = 3;
+ do {
+ nextLedger = doGcLedgers(nextLedger, snapshot, gc);
+ } while (null != nextLedger && --numRetries > 0);
+ LOG.debug("End garbage collect ledgers.");
+ }
+
+ /**
+ * Do garbage collection starting from <code>startLedger</code>.
+ *
+ * @param startLedgerId
+ * Start Ledger id
+ * @param snapshot
+ * Current snapshot of active ledgers
+ * @param gc
+ * Garbage collector
+ * @return null if finished scanning all ledgers, the next ledger id to
+ * scan
+ */
+ private Long doGcLedgers(Long startLedgerId, NavigableMap<Long, Boolean> snapshot, GarbageCollector gc) {
+ final SyncResult<MetastoreCursor> result = new SyncResult<MetastoreCursor>();
+ MetastoreCallback<MetastoreCursor> openCursorCb = new MetastoreCallback<MetastoreCursor>() {
+ @Override
+ public void complete(int rc, MetastoreCursor cursor, Object ctx) {
+ result.complete(rc, cursor);
+ }
+ };
+ ledgerTable.openCursor(ledgerId2Key(startLedgerId), true, EMPTY_END_KEY, true, Order.ASC, NON_FIELDS,
+ openCursorCb, null);
+ result.block();
+ if (MSException.Code.OK.getCode() != result.getRetCode()) {
+ LOG.warn("Failed to open metastore cursor to run garbage collection : ",
+ MSException.create(MSException.Code.get(result.getRetCode())));
+ // failed to open a cursor, not continue until next gc
+ return null;
+ }
+
+ MetastoreCursor cursor = result.getResult();
+
+ while (cursor.hasMoreEntries()) {
+ Iterator<MetastoreTableItem> entries;
+ try {
+ entries = cursor.readEntries(maxEntriesPerScan);
+ } catch (MSException mse) {
+ LOG.warn("Exception when garbage collecting deleted ledgers : ", mse);
+ return startLedgerId;
+ }
+
+ if (!entries.hasNext()) {
+ continue;
+ }
+
+ SortedSet<Long> msActiveLedgers = entries2Ledgers(entries);
+
+ Long endLedgerId = msActiveLedgers.last();
+ Map<Long, Boolean> bkActiveLedgers = snapshot.subMap(startLedgerId, true, endLedgerId, true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All active ledgers from Metastore between {} and {} : {}", new Object[] { startLedgerId,
+ endLedgerId, msActiveLedgers });
+ LOG.debug("Current active ledgers from Bookie between {} and {} : {}", new Object[] {
+ startLedgerId, endLedgerId, bkActiveLedgers });
+ }
+ doGc(gc, bkActiveLedgers, msActiveLedgers);
+ // move the pointer
+ startLedgerId = endLedgerId + 1;
+ }
+ doGc(gc, snapshot.tailMap(startLedgerId), new TreeSet<Long>());
+ return null;
+ }
+
+ /**
+ * Do garbage collecting comparing hosted ledgers and metastore ledgers
+ *
+ * @param gc
+ * Garbage collector to do garbage collection when found
+ * inactive/deleted ledgers
+ * @param bkActiveLedgers
+ * Active ledgers hosted in bookie server
+ * @param msAllLedgers
+ * All ledgers stored in metastore
+ */
+ void doGc(GarbageCollector gc, Map<Long, Boolean> bkActiveLedgers, Set<Long> msAllLedgers) {
+ // remove any active ledgers that doesn't exist in zk
+ for (Long bkLid : bkActiveLedgers.keySet()) {
+ if (!msAllLedgers.contains(bkLid)) {
+ // remove it from current active ledger
+ LOG.debug("gc ledger: {}", bkLid);
+ bkActiveLedgers.remove(bkLid);
+ gc.gc(bkLid);
+ }
+ }
+ }
+ }
+
+ @Override
+ public LedgerManager newLedgerManager() {
+ return new MsLedgerManager(conf, zk, metastore);
+ }
+
+ @Override
+ public ActiveLedgerManager newActiveLedgerManager() {
+ return new MsActiveLedgerManager(conf, zk, metastore);
+ }
+
+ @Override
+ public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException,
+ InterruptedException, ReplicationException.CompatibilityException {
+ // TODO: currently just use zk ledger underreplication manager
+ return new ZkLedgerUnderreplicationManager(conf, zk);
+ }
+
+ /**
+ * Process set one by one in asynchronize way. Process will be stopped
+ * immediately when error occurred.
+ */
+ private static class AsyncSetProcessor<T> {
+ // use this to prevent long stack chains from building up in callbacks
+ ScheduledExecutorService scheduler;
+
+ /**
+ * Constructor
+ *
+ * @param scheduler
+ * Executor used to prevent long stack chains
+ */
+ public AsyncSetProcessor(ScheduledExecutorService scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ /**
+ * Process set of items
+ *
+ * @param data
+ * Set of data to process
+ * @param processor
+ * Callback to process element of list when success
+ * @param finalCb
+ * Final callback to be called after all elements in the list
+ * are processed
+ * @param contxt
+ * Context of final callback
+ * @param successRc
+ * RC passed to final callback on success
+ * @param failureRc
+ * RC passed to final callback on failure
+ */
+ public void process(final Set<T> data, final Processor<T> processor, final AsyncCallback.VoidCallback finalCb,
+ final Object context, final int successRc, final int failureRc) {
+ if (data == null || data.size() == 0) {
+ finalCb.processResult(successRc, null, context);
+ return;
+ }
+ final Iterator<T> iter = data.iterator();
+ AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != successRc) {
+ // terminal immediately
+ finalCb.processResult(failureRc, null, context);
+ return;
+ }
+ if (!iter.hasNext()) { // reach the end of list
+ finalCb.processResult(successRc, null, context);
+ return;
+ }
+ // process next element
+ final T dataToProcess = iter.next();
+ final AsyncCallback.VoidCallback stub = this;
+ scheduler.submit(new Runnable() {
+ @Override
+ public final void run() {
+ processor.process(dataToProcess, stub);
+ }
+ });
+ }
+ };
+ T firstElement = iter.next();
+ processor.process(firstElement, stubCallback);
+ }
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java Wed Dec 12 10:20:38 2012
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
@@ -794,6 +795,7 @@ public class BookieRecoveryTest extends
adminConf.setZkServers(zkUtil.getZooKeeperConnectString());
adminConf.setBookieRecoveryDigestType(digestCorrect);
adminConf.setBookieRecoveryPasswd(passwdBad);
+ setMetastoreImplClass(adminConf);
BookKeeperAdmin bka = new BookKeeperAdmin(adminConf);
bka.recoverBookieData(bookieSrc, null);
@@ -816,6 +818,7 @@ public class BookieRecoveryTest extends
// Try to recover with no password in conf
adminConf = new ClientConfiguration();
adminConf.setZkServers(zkUtil.getZooKeeperConnectString());
+ setMetastoreImplClass(adminConf);
bka = new BookKeeperAdmin(adminConf);
bka.recoverBookieData(bookieSrc, null);
@@ -832,6 +835,13 @@ public class BookieRecoveryTest extends
*/
@Test
public void ensurePasswordUsedForOldLedgers() throws Exception {
+ // This test bases on creating old ledgers in version 4.1.0, which only
+ // supports ZooKeeper based flat and hierarchical LedgerManagerFactory.
+ // So we ignore it for MSLedgerManagerFactory.
+ if (MSLedgerManagerFactory.class.getName().equals(ledgerManagerFactory)) {
+ return;
+ }
+
// stop all bookies
// and wipe the ledger layout so we can use an old client
zkUtil.getZooKeeperClient().delete("/ledgers/LAYOUT", -1);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java Wed Dec 12 10:20:38 2012
@@ -221,7 +221,7 @@ public class TestMetaStore extends TestC
}
protected Integer getRandom() {
- return (int)Math.random()*65536;
+ return (int)(Math.random()*65536);
}
protected Versioned<Value> getRecord(String recordId) throws Exception {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java Wed Dec 12 10:20:38 2012
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.client.Ledg
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
import org.apache.bookkeeper.test.MultiLedgerManagerTestCase;
import org.apache.commons.io.FileUtils;
@@ -132,6 +133,13 @@ public class BookieLedgerIndexTest exten
*/
@Test
public void testWithoutZookeeper() throws Exception {
+ // This test case is for ledger metadata that stored in ZooKeeper. As
+ // far as MSLedgerManagerFactory, ledger metadata are stored in other
+ // storage. So this test is not suitable for MSLedgerManagerFactory.
+ if (newLedgerManagerFactory instanceof MSLedgerManagerFactory) {
+ return;
+ }
+
for (int i = 0; i < numberOfLedgers; i++) {
createAndAddEntriesToLedger().close();
}
@@ -141,7 +149,7 @@ public class BookieLedgerIndexTest exten
stopZKCluster();
try {
bookieLedgerIndex.getBookieToLedgerIndex();
- fail("Must throw exception as bookies are not running!");
+ fail("Must throw exception as zookeeper are not running!");
} catch (BKAuditException bkAuditException) {
// expected behaviour
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Wed Dec 12 10:20:38 2012
@@ -30,8 +30,10 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.metastore.InMemoryMetaStore;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.commons.io.FileUtils;
@@ -74,6 +76,10 @@ public abstract class BookKeeperClusterT
@Override
public void setUp() throws Exception {
LOG.info("Setting up test {}", getName());
+ InMemoryMetaStore.reset();
+ setMetastoreImplClass(baseConf);
+ setMetastoreImplClass(baseClientConf);
+
try {
// start zookeeper service
startZKCluster();
@@ -382,4 +388,9 @@ public abstract class BookKeeperClusterT
return server;
}
+
+ public void setMetastoreImplClass(AbstractConfiguration conf) {
+ conf.setMetastoreImplClass(InMemoryMetaStore.class.getName());
+ }
+
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java Wed Dec 12 10:20:38 2012
@@ -49,6 +49,7 @@ public abstract class MultiLedgerManager
String[] ledgerManagers = {
"org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
+ "org.apache.bookkeeper.meta.MSLedgerManagerFactory",
};
ArrayList<Object[]> cfgs = new ArrayList<Object[]>(ledgerManagers.length);
DigestType[] digestTypes = new DigestType[] { DigestType.MAC, DigestType.CRC32 };
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java Wed Dec 12 10:20:38 2012
@@ -45,6 +45,7 @@ public abstract class MultiLedgerManager
String[] ledgerManagers = new String[] {
"org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
+ "org.apache.bookkeeper.meta.MSLedgerManagerFactory",
};
ArrayList<Object[]> cfgs = new ArrayList<Object[]>(ledgerManagers.length);
for (String lm : ledgerManagers) {