You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:51 UTC
[12/21] hbase git commit: HBASE-12522 Backport of write-ahead-log
refactoring and follow-ons.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
new file mode 100644
index 0000000..497a4d8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -0,0 +1,430 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.io.InterruptedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+
+/**
+ * Entry point for users of the Write Ahead Log.
+ * Acts as the shim between internal use and the particular WALProvider we use to handle wal
+ * requests.
+ *
+ * Configure which provider gets used with the configuration setting "hbase.wal.provider". Available
+ * implementations:
+ * <ul>
+ * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version.</li>
+ * </ul>
+ *
+ * Alternatively, you may provide a custome implementation of {@link WALProvider} by class name.
+ */
+@InterfaceAudience.Private
+public class WALFactory {
+
+ private static final Log LOG = LogFactory.getLog(WALFactory.class);
+
+ /**
+ * Maps between configuration names for providers and implementation classes.
+ */
+ static enum Providers {
+ defaultProvider(DefaultWALProvider.class);
+
+ Class<? extends WALProvider> clazz;
+ Providers(Class<? extends WALProvider> clazz) {
+ this.clazz = clazz;
+ }
+ }
+
+ static final String WAL_PROVIDER = "hbase.wal.provider";
+ static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
+
+ static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
+ static final String DEFAULT_META_WAL_PROVIDER = Providers.defaultProvider.name();
+
+ final String factoryId;
+ final WALProvider provider;
+ // The meta updates are written to a different wal. If this
+ // regionserver holds meta regions, then this ref will be non-null.
+ // lazily intialized; most RegionServers don't deal with META
+ final AtomicReference<WALProvider> metaProvider = new AtomicReference<WALProvider>();
+
+ /**
+ * Configuration-specified WAL Reader used when a custom reader is requested
+ */
+ private final Class<? extends DefaultWALProvider.Reader> logReaderClass;
+
+ /**
+ * How long to attempt opening in-recovery wals
+ */
+ private final int timeoutMillis;
+
+ private final Configuration conf;
+
+ // Used for the singleton WALFactory, see below.
+ private WALFactory(Configuration conf) {
+ // this code is duplicated here so we can keep our members final.
+ // until we've moved reader/writer construction down into providers, this initialization must
+ // happen prior to provider initialization, in case they need to instantiate a reader/writer.
+ timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
+ /* TODO Both of these are probably specific to the fs wal provider */
+ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
+ DefaultWALProvider.Reader.class);
+ this.conf = conf;
+ // end required early initialization
+
+ // this instance can't create wals, just reader/writers.
+ provider = null;
+ factoryId = SINGLETON_ID;
+ }
+
+ /**
+ * instantiate a provider from a config property.
+ * requires conf to have already been set (as well as anything the provider might need to read).
+ */
+ WALProvider getProvider(final String key, final String defaultValue,
+ final List<WALActionsListener> listeners, final String providerId) throws IOException {
+ Class<? extends WALProvider> clazz;
+ try {
+ clazz = Providers.valueOf(conf.get(key, defaultValue)).clazz;
+ } catch (IllegalArgumentException exception) {
+ // Fall back to them specifying a class name
+ // Note that the passed default class shouldn't actually be used, since the above only fails
+ // when there is a config value present.
+ clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
+ }
+ try {
+ final WALProvider result = clazz.newInstance();
+ result.init(this, conf, listeners, providerId);
+ return result;
+ } catch (InstantiationException exception) {
+ LOG.error("couldn't set up WALProvider, check config key " + key);
+ LOG.debug("Exception details for failure to load WALProvider.", exception);
+ throw new IOException("couldn't set up WALProvider", exception);
+ } catch (IllegalAccessException exception) {
+ LOG.error("couldn't set up WALProvider, check config key " + key);
+ LOG.debug("Exception details for failure to load WALProvider.", exception);
+ throw new IOException("couldn't set up WALProvider", exception);
+ }
+ }
+
+ /**
+ * @param conf must not be null, will keep a reference to read params in later reader/writer
+ * instances.
+ * @param listeners may be null. will be given to all created wals (and not meta-wals)
+ * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
+ * to make a directory
+ */
+ public WALFactory(final Configuration conf, final List<WALActionsListener> listeners,
+ final String factoryId) throws IOException {
+ // until we've moved reader/writer construction down into providers, this initialization must
+ // happen prior to provider initialization, in case they need to instantiate a reader/writer.
+ timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
+ /* TODO Both of these are probably specific to the fs wal provider */
+ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
+ DefaultWALProvider.Reader.class);
+ this.conf = conf;
+ this.factoryId = factoryId;
+ // end required early initialization
+ if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
+ provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, listeners, null);
+ } else {
+ // special handling of existing configuration behavior.
+ LOG.warn("Running with WAL disabled.");
+ provider = new DisabledWALProvider();
+ provider.init(this, conf, null, factoryId);
+ }
+ }
+
+ /**
+ * Shutdown all WALs and clean up any underlying storage.
+ * Use only when you will not need to replay and edits that have gone to any wals from this
+ * factory.
+ */
+ public void close() throws IOException {
+ final WALProvider metaProvider = this.metaProvider.get();
+ if (null != metaProvider) {
+ metaProvider.close();
+ }
+ // close is called on a WALFactory with null provider in the case of contention handling
+ // within the getInstance method.
+ if (null != provider) {
+ provider.close();
+ }
+ }
+
+ /**
+ * Tell the underlying WAL providers to shut down, but do not clean up underlying storage.
+ * If you are not ending cleanly and will need to replay edits from this factory's wals,
+ * use this method if you can as it will try to leave things as tidy as possible.
+ */
+ public void shutdown() throws IOException {
+ IOException exception = null;
+ final WALProvider metaProvider = this.metaProvider.get();
+ if (null != metaProvider) {
+ try {
+ metaProvider.shutdown();
+ } catch(IOException ioe) {
+ exception = ioe;
+ }
+ }
+ provider.shutdown();
+ if (null != exception) {
+ throw exception;
+ }
+ }
+
+ /**
+ * @param identifier may not be null, contents will not be altered
+ */
+ public WAL getWAL(final byte[] identifier) throws IOException {
+ return provider.getWAL(identifier);
+ }
+
+ /**
+ * @param identifier may not be null, contents will not be altered
+ */
+ public WAL getMetaWAL(final byte[] identifier) throws IOException {
+ WALProvider metaProvider = this.metaProvider.get();
+ if (null == metaProvider) {
+ final WALProvider temp = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER,
+ Collections.<WALActionsListener>singletonList(new MetricsWAL()),
+ DefaultWALProvider.META_WAL_PROVIDER_ID);
+ if (this.metaProvider.compareAndSet(null, temp)) {
+ metaProvider = temp;
+ } else {
+ // reference must now be to a provider created in another thread.
+ temp.close();
+ metaProvider = this.metaProvider.get();
+ }
+ }
+ return metaProvider.getWAL(identifier);
+ }
+
+ public Reader createReader(final FileSystem fs, final Path path) throws IOException {
+ return createReader(fs, path, (CancelableProgressable)null);
+ }
+
+ /**
+ * Create a reader for the WAL. If you are reading from a file that's being written to and need
+ * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method
+ * then just seek back to the last known good position.
+ * @return A WAL reader. Close when done with it.
+ * @throws IOException
+ */
+ public Reader createReader(final FileSystem fs, final Path path,
+ CancelableProgressable reporter) throws IOException {
+ return createReader(fs, path, reporter, true);
+ }
+
+ public Reader createReader(final FileSystem fs, final Path path,
+ CancelableProgressable reporter, boolean allowCustom)
+ throws IOException {
+ Class<? extends DefaultWALProvider.Reader> lrClass =
+ allowCustom ? logReaderClass : ProtobufLogReader.class;
+
+ try {
+ // A wal file could be under recovery, so it may take several
+ // tries to get it open. Instead of claiming it is corrupted, retry
+ // to open it up to 5 minutes by default.
+ long startWaiting = EnvironmentEdgeManager.currentTime();
+ long openTimeout = timeoutMillis + startWaiting;
+ int nbAttempt = 0;
+ while (true) {
+ try {
+ if (lrClass != ProtobufLogReader.class) {
+ // User is overriding the WAL reader, let them.
+ DefaultWALProvider.Reader reader = lrClass.newInstance();
+ reader.init(fs, path, conf, null);
+ return reader;
+ } else {
+ FSDataInputStream stream = fs.open(path);
+ // Note that zero-length file will fail to read PB magic, and attempt to create
+ // a non-PB reader and fail the same way existing code expects it to. If we get
+ // rid of the old reader entirely, we need to handle 0-size files differently from
+ // merely non-PB files.
+ byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
+ boolean isPbWal = (stream.read(magic) == magic.length)
+ && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
+ DefaultWALProvider.Reader reader =
+ isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
+ reader.init(fs, path, conf, stream);
+ return reader;
+ }
+ } catch (IOException e) {
+ String msg = e.getMessage();
+ if (msg != null && (msg.contains("Cannot obtain block length")
+ || msg.contains("Could not obtain the last block")
+ || msg.matches("Blocklist for [^ ]* has changed.*"))) {
+ if (++nbAttempt == 1) {
+ LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
+ }
+ if (reporter != null && !reporter.progress()) {
+ throw new InterruptedIOException("Operation is cancelled");
+ }
+ if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
+ LOG.error("Can't open after " + nbAttempt + " attempts and "
+ + (EnvironmentEdgeManager.currentTime() - startWaiting)
+ + "ms " + " for " + path);
+ } else {
+ try {
+ Thread.sleep(nbAttempt < 3 ? 500 : 1000);
+ continue; // retry
+ } catch (InterruptedException ie) {
+ InterruptedIOException iioe = new InterruptedIOException();
+ iioe.initCause(ie);
+ throw iioe;
+ }
+ }
+ }
+ throw e;
+ }
+ }
+ } catch (IOException ie) {
+ throw ie;
+ } catch (Exception e) {
+ throw new IOException("Cannot get log reader", e);
+ }
+ }
+
+ /**
+ * Create a writer for the WAL.
+ * should be package-private. public only for tests and
+ * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
+ * @return A WAL writer. Close when done with it.
+ * @throws IOException
+ */
+ public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
+ return DefaultWALProvider.createWriter(conf, fs, path, false);
+ }
+
+ /**
+ * should be package-private, visible for recovery testing.
+ * @return an overwritable writer for recovered edits. caller should close.
+ */
+ @VisibleForTesting
+ public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
+ throws IOException {
+ return DefaultWALProvider.createWriter(conf, fs, path, true);
+ }
+
+ // These static methods are currently used where it's impractical to
+ // untangle the reliance on state in the filesystem. They rely on singleton
+ // WALFactory that just provides Reader / Writers.
+ // For now, first Configuration object wins. Practically this just impacts the reader/writer class
+ private static final AtomicReference<WALFactory> singleton = new AtomicReference<WALFactory>();
+ private static final String SINGLETON_ID = WALFactory.class.getName();
+
+ // public only for FSHLog and UpgradeTo96
+ public static WALFactory getInstance(Configuration configuration) {
+ WALFactory factory = singleton.get();
+ if (null == factory) {
+ WALFactory temp = new WALFactory(configuration);
+ if (singleton.compareAndSet(null, temp)) {
+ factory = temp;
+ } else {
+ // someone else beat us to initializing
+ try {
+ temp.close();
+ } catch (IOException exception) {
+ LOG.debug("failed to close temporary singleton. ignoring.", exception);
+ }
+ factory = singleton.get();
+ }
+ }
+ return factory;
+ }
+
+ /**
+ * Create a reader for the given path, accept custom reader classes from conf.
+ * If you already have a WALFactory, you should favor the instance method.
+ * @return a WAL Reader, caller must close.
+ */
+ public static Reader createReader(final FileSystem fs, final Path path,
+ final Configuration configuration) throws IOException {
+ return getInstance(configuration).createReader(fs, path);
+ }
+
+ /**
+ * Create a reader for the given path, accept custom reader classes from conf.
+ * If you already have a WALFactory, you should favor the instance method.
+ * @return a WAL Reader, caller must close.
+ */
+ static Reader createReader(final FileSystem fs, final Path path,
+ final Configuration configuration, final CancelableProgressable reporter) throws IOException {
+ return getInstance(configuration).createReader(fs, path, reporter);
+ }
+
+ /**
+ * Create a reader for the given path, ignore custom reader classes from conf.
+ * If you already have a WALFactory, you should favor the instance method.
+ * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
+ * @return a WAL Reader, caller must close.
+ */
+ public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path,
+ final Configuration configuration) throws IOException {
+ return getInstance(configuration).createReader(fs, path, null, false);
+ }
+
+ /**
+ * If you already have a WALFactory, you should favor the instance method.
+ * @return a Writer that will overwrite files. Caller must close.
+ */
+ static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
+ final Configuration configuration)
+ throws IOException {
+ return DefaultWALProvider.createWriter(configuration, fs, path, true);
+ }
+
+ /**
+ * If you already have a WALFactory, you should favor the instance method.
+ * @return a writer that won't overwrite files. Caller must close.
+ */
+ @VisibleForTesting
+ public static Writer createWALWriter(final FileSystem fs, final Path path,
+ final Configuration configuration)
+ throws IOException {
+ return DefaultWALProvider.createWriter(configuration, fs, path, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
new file mode 100644
index 0000000..61c7a97
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -0,0 +1,553 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
+import org.apache.hadoop.hbase.regionserver.SequenceId;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+
+/**
+ * A Key for an entry in the change log.
+ *
+ * The log intermingles edits to many tables and rows, so each log entry
+ * identifies the appropriate table and row. Within a table and row, they're
+ * also sorted.
+ *
+ * <p>Some Transactional edits (START, COMMIT, ABORT) will not have an
+ * associated row.
+ *
+ * Note that protected members marked @InterfaceAudience.Private are only protected
+ * to support the legacy HLogKey class, which is in a different package.
+ */
+// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
+// purposes. They need to be merged into WALEntry.
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public class WALKey implements SequenceId, Comparable<WALKey> {
+ public static final Log LOG = LogFactory.getLog(WALKey.class);
+
+ // should be < 0 (@see HLogKey#readFields(DataInput))
+ // version 2 supports WAL compression
+ // public members here are only public because of HLogKey
+ @InterfaceAudience.Private
+ protected enum Version {
+ UNVERSIONED(0),
+ // Initial number we put on WALKey when we introduced versioning.
+ INITIAL(-1),
+ // Version -2 introduced a dictionary compression facility. Only this
+ // dictionary-based compression is available in version -2.
+ COMPRESSED(-2);
+
+ public final int code;
+ static final Version[] byCode;
+ static {
+ byCode = Version.values();
+ for (int i = 0; i < byCode.length; i++) {
+ if (byCode[i].code != -1 * i) {
+ throw new AssertionError("Values in this enum should be descending by one");
+ }
+ }
+ }
+
+ Version(int code) {
+ this.code = code;
+ }
+
+ public boolean atLeast(Version other) {
+ return code <= other.code;
+ }
+
+ public static Version fromCode(int code) {
+ return byCode[code * -1];
+ }
+ }
+
+ /*
+ * This is used for reading the log entries created by the previous releases
+ * (0.94.11) which write the clusters information to the scopes of WALEdit.
+ */
+ private static final String PREFIX_CLUSTER_KEY = ".";
+
+
+ // visible for deprecated HLogKey
+ @InterfaceAudience.Private
+ protected static final Version VERSION = Version.COMPRESSED;
+
+ /** Used to represent when a particular wal key doesn't know/care about the sequence ordering. */
+ public static final long NO_SEQUENCE_ID = -1;
+
+
+ // visible for deprecated HLogKey
+ @InterfaceAudience.Private
+ protected byte [] encodedRegionName;
+ // visible for deprecated HLogKey
+ @InterfaceAudience.Private
+ protected TableName tablename;
+ // visible for deprecated HLogKey
+ @InterfaceAudience.Private
+ protected long logSeqNum;
+ private long origLogSeqNum = 0;
+ private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
+ // Time at which this edit was written.
+ // visible for deprecated HLogKey
+ @InterfaceAudience.Private
+ protected long writeTime;
+
+ // The first element in the list is the cluster id on which the change has originated
+ // visible for deprecated HLogKey
+ @InterfaceAudience.Private
+ protected List<UUID> clusterIds;
+
+ private NavigableMap<byte[], Integer> scopes;
+
+ private long nonceGroup = HConstants.NO_NONCE;
+ private long nonce = HConstants.NO_NONCE;
+ static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
+
+ // visible for deprecated HLogKey
+ @InterfaceAudience.Private
+ protected CompressionContext compressionContext;
+
+ public WALKey() {
+ init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
+ new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+
+ @VisibleForTesting
+ public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
+ final long now, UUID clusterId) {
+ List<UUID> clusterIds = new ArrayList<UUID>();
+ clusterIds.add(clusterId);
+ init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
+ HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+
+ public WALKey(final byte[] encodedRegionName, final TableName tablename) {
+ this(encodedRegionName, tablename, System.currentTimeMillis());
+ }
+
+ public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now,
+ EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+
+ /**
+ * Create the log key for writing to somewhere.
+ * We maintain the tablename mainly for debugging purposes.
+ * A regionName is always a sub-table object.
+ * <p>Used by log splitting and snapshots.
+ *
+ * @param encodedRegionName Encoded name of the region as returned by
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tablename - name of table
+ * @param logSeqNum - log sequence number
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
+ */
+ public WALKey(final byte [] encodedRegionName, final TableName tablename,
+ long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
+ init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+ }
+
+ /**
+ * Create the log key for writing to somewhere.
+ * We maintain the tablename mainly for debugging purposes.
+ * A regionName is always a sub-table object.
+ *
+ * @param encodedRegionName Encoded name of the region as returned by
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tablename
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * @param nonceGroup
+ * @param nonce
+ */
+ public WALKey(final byte [] encodedRegionName, final TableName tablename,
+ final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds,
+ nonceGroup, nonce);
+ }
+
+ /**
+ * Create the log key for writing to somewhere.
+ * We maintain the tablename mainly for debugging purposes.
+ * A regionName is always a sub-table object.
+ *
+ * @param encodedRegionName Encoded name of the region as returned by
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tablename
+ * @param logSeqNum
+ * @param nonceGroup
+ * @param nonce
+ */
+ public WALKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
+ long nonceGroup, long nonce) {
+ init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTime(),
+ EMPTY_UUIDS, nonceGroup, nonce);
+ }
+
+ @InterfaceAudience.Private
+ protected void init(final byte [] encodedRegionName, final TableName tablename,
+ long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
+ this.logSeqNum = logSeqNum;
+ this.writeTime = now;
+ this.clusterIds = clusterIds;
+ this.encodedRegionName = encodedRegionName;
+ this.tablename = tablename;
+ this.nonceGroup = nonceGroup;
+ this.nonce = nonce;
+ }
+
+ /**
+ * @param compressionContext Compression context to use
+ */
+ public void setCompressionContext(CompressionContext compressionContext) {
+ this.compressionContext = compressionContext;
+ }
+
+ /** @return encoded region name */
+ public byte [] getEncodedRegionName() {
+ return encodedRegionName;
+ }
+
+ /** @return table name */
+ public TableName getTablename() {
+ return tablename;
+ }
+
+ /** @return log sequence number */
+ public long getLogSeqNum() {
+ return this.logSeqNum;
+ }
+
+ /**
+ * Allow that the log sequence id to be set post-construction and release all waiters on assigned
+ * sequence number.
+ * Only public for {@link org.apache.hadoop.hbase.regionserver.wal.FSWALEntry}
+ * @param sequence
+ */
+ @InterfaceAudience.Private
+ public void setLogSeqNum(final long sequence) {
+ this.logSeqNum = sequence;
+ this.seqNumAssignedLatch.countDown();
+ }
+
+ /**
+ * Used to set original seq Id for WALKey during wal replay
+ * @param seqId
+ */
+ public void setOrigLogSeqNum(final long seqId) {
+ this.origLogSeqNum = seqId;
+ }
+
+ /**
+ * Return a positive long if current WALKey is created from a replay edit
+ * @return original sequence number of the WALEdit
+ */
+ public long getOrigLogSeqNum() {
+ return this.origLogSeqNum;
+ }
+
+ /**
+ * Wait for sequence number is assigned & return the assigned value
+ * @return long the new assigned sequence number
+ * @throws InterruptedException
+ */
+ @Override
+ public long getSequenceId() throws IOException {
+ try {
+ this.seqNumAssignedLatch.await();
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread interrupted waiting for next log sequence number");
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
+ }
+ return this.logSeqNum;
+ }
+
+ /**
+ * @return the write time
+ */
+ public long getWriteTime() {
+ return this.writeTime;
+ }
+
+ public NavigableMap<byte[], Integer> getScopes() {
+ return scopes;
+ }
+
+ /** @return The nonce group */
+ public long getNonceGroup() {
+ return nonceGroup;
+ }
+
+ /** @return The nonce */
+ public long getNonce() {
+ return nonce;
+ }
+
+ public void setScopes(NavigableMap<byte[], Integer> scopes) {
+ this.scopes = scopes;
+ }
+
+ public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
+ if (scopes != null) {
+ Iterator<Map.Entry<byte[], Integer>> iterator = scopes.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<byte[], Integer> scope = iterator.next();
+ String key = Bytes.toString(scope.getKey());
+ if (key.startsWith(PREFIX_CLUSTER_KEY)) {
+ addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY
+ .length())));
+ iterator.remove();
+ }
+ }
+ if (scopes.size() > 0) {
+ this.scopes = scopes;
+ }
+ }
+ }
+
+ /**
+ * Marks that the cluster with the given clusterId has consumed the change
+ */
+ public void addClusterId(UUID clusterId) {
+ if (!clusterIds.contains(clusterId)) {
+ clusterIds.add(clusterId);
+ }
+ }
+
+ /**
+ * @return the set of cluster Ids that have consumed the change
+ */
+ public List<UUID> getClusterIds() {
+ return clusterIds;
+ }
+
+ /**
+ * @return the cluster id on which the change has originated. It there is no such cluster, it
+ * returns DEFAULT_CLUSTER_ID (cases where replication is not enabled)
+ */
+ public UUID getOriginatingClusterId(){
+ return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
+ }
+
+ @Override
+ public String toString() {
+ return tablename + "/" + Bytes.toString(encodedRegionName) + "/" +
+ logSeqNum;
+ }
+
+ /**
+ * Produces a string map for this key. Useful for programmatic use and
+ * manipulation of the data stored in an WALKey, for example, printing
+ * as JSON.
+ *
+ * @return a Map containing data from this key
+ */
+ public Map<String, Object> toStringMap() {
+ Map<String, Object> stringMap = new HashMap<String, Object>();
+ stringMap.put("table", tablename);
+ stringMap.put("region", Bytes.toStringBinary(encodedRegionName));
+ stringMap.put("sequence", logSeqNum);
+ return stringMap;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ return compareTo((WALKey)obj) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Bytes.hashCode(this.encodedRegionName);
+ result ^= this.logSeqNum;
+ result ^= this.writeTime;
+ return result;
+ }
+
+ @Override
+ public int compareTo(WALKey o) {
+ int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
+ if (result == 0) {
+ if (this.logSeqNum < o.logSeqNum) {
+ result = -1;
+ } else if (this.logSeqNum > o.logSeqNum) {
+ result = 1;
+ }
+ if (result == 0) {
+ if (this.writeTime < o.writeTime) {
+ result = -1;
+ } else if (this.writeTime > o.writeTime) {
+ return 1;
+ }
+ }
+ }
+ // why isn't cluster id accounted for?
+ return result;
+ }
+
+ /**
+ * Drop this instance's tablename byte array and instead
+ * hold a reference to the provided tablename. This is not
+ * meant to be a general purpose setter - it's only used
+ * to collapse references to conserve memory.
+ */
+ void internTableName(TableName tablename) {
+ // We should not use this as a setter - only to swap
+ // in a new reference to the same table name.
+ assert tablename.equals(this.tablename);
+ this.tablename = tablename;
+ }
+
+ /**
+ * Drop this instance's region name byte array and instead
+ * hold a reference to the provided region name. This is not
+ * meant to be a general purpose setter - it's only used
+ * to collapse references to conserve memory.
+ */
+ void internEncodedRegionName(byte []encodedRegionName) {
+ // We should not use this as a setter - only to swap
+ // in a new reference to the same table name.
+ assert Bytes.equals(this.encodedRegionName, encodedRegionName);
+ this.encodedRegionName = encodedRegionName;
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
+ throws IOException {
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
+ if (compressionContext == null) {
+ builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
+ builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
+ } else {
+ builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
+ compressionContext.regionDict));
+ builder.setTableName(compressor.compress(this.tablename.getName(),
+ compressionContext.tableDict));
+ }
+ builder.setLogSequenceNumber(this.logSeqNum);
+ builder.setWriteTime(writeTime);
+ if(this.origLogSeqNum > 0) {
+ builder.setOrigSequenceNumber(this.origLogSeqNum);
+ }
+ if (this.nonce != HConstants.NO_NONCE) {
+ builder.setNonce(nonce);
+ }
+ if (this.nonceGroup != HConstants.NO_NONCE) {
+ builder.setNonceGroup(nonceGroup);
+ }
+ HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
+ for (UUID clusterId : clusterIds) {
+ uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
+ uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
+ builder.addClusterIds(uuidBuilder.build());
+ }
+ if (scopes != null) {
+ for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
+ ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
+ : compressor.compress(e.getKey(), compressionContext.familyDict);
+ builder.addScopes(FamilyScope.newBuilder()
+ .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));
+ }
+ }
+ return builder;
+ }
+
+ public void readFieldsFromPb(
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
+ if (this.compressionContext != null) {
+ this.encodedRegionName = uncompressor.uncompress(
+ walKey.getEncodedRegionName(), compressionContext.regionDict);
+ byte[] tablenameBytes = uncompressor.uncompress(
+ walKey.getTableName(), compressionContext.tableDict);
+ this.tablename = TableName.valueOf(tablenameBytes);
+ } else {
+ this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
+ this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
+ }
+ clusterIds.clear();
+ if (walKey.hasClusterId()) {
+ //When we are reading the older log (0.95.1 release)
+ //This is definitely the originating cluster
+ clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
+ .getLeastSigBits()));
+ }
+ for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
+ clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
+ }
+ if (walKey.hasNonceGroup()) {
+ this.nonceGroup = walKey.getNonceGroup();
+ }
+ if (walKey.hasNonce()) {
+ this.nonce = walKey.getNonce();
+ }
+ this.scopes = null;
+ if (walKey.getScopesCount() > 0) {
+ this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ for (FamilyScope scope : walKey.getScopesList()) {
+ byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
+ uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
+ this.scopes.put(family, scope.getScopeType().getNumber());
+ }
+ }
+ this.logSeqNum = walKey.getLogSequenceNumber();
+ this.writeTime = walKey.getWriteTime();
+ if(walKey.hasOrigSequenceNumber()) {
+ this.origLogSeqNum = walKey.getOrigSequenceNumber();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
new file mode 100644
index 0000000..280f731
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+
+// imports for things that haven't moved yet.
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * WALPrettyPrinter prints the contents of a given WAL with a variety of
+ * options affecting formatting and extent of content.
+ *
+ * It targets two usage cases: pretty printing for ease of debugging directly by
+ * humans, and JSON output for consumption by monitoring and/or maintenance
+ * scripts.
+ *
+ * It can filter by row, region, or sequence id.
+ *
+ * It can also toggle output of values.
+ *
+ */
+@InterfaceAudience.Private
+public class WALPrettyPrinter {
+ private boolean outputValues;
+ private boolean outputJSON;
+ // The following enable filtering by sequence, region, and row, respectively
+ private long sequence;
+ private String region;
+ private String row;
+ // enable in order to output a single list of transactions from several files
+ private boolean persistentOutput;
+ private boolean firstTxn;
+ // useful for programatic capture of JSON output
+ private PrintStream out;
+ // for JSON encoding
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ /**
+ * Basic constructor that simply initializes values to reasonable defaults.
+ */
+ public WALPrettyPrinter() {
+ outputValues = false;
+ outputJSON = false;
+ sequence = -1;
+ region = null;
+ row = null;
+ persistentOutput = false;
+ firstTxn = true;
+ out = System.out;
+ }
+
+ /**
+ * Fully specified constructor.
+ *
+ * @param outputValues
+ * when true, enables output of values along with other log
+ * information
+ * @param outputJSON
+ * when true, enables output in JSON format rather than a
+ * "pretty string"
+ * @param sequence
+ * when nonnegative, serves as a filter; only log entries with this
+ * sequence id will be printed
+ * @param region
+ * when not null, serves as a filter; only log entries from this
+ * region will be printed
+ * @param row
+ * when not null, serves as a filter; only log entries from this row
+ * will be printed
+ * @param persistentOutput
+ * keeps a single list running for multiple files. if enabled, the
+ * endPersistentOutput() method must be used!
+ * @param out
+ * Specifies an alternative to stdout for the destination of this
+ * PrettyPrinter's output.
+ */
+ public WALPrettyPrinter(boolean outputValues, boolean outputJSON,
+ long sequence, String region, String row, boolean persistentOutput,
+ PrintStream out) {
+ this.outputValues = outputValues;
+ this.outputJSON = outputJSON;
+ this.sequence = sequence;
+ this.region = region;
+ this.row = row;
+ this.persistentOutput = persistentOutput;
+ if (persistentOutput) {
+ beginPersistentOutput();
+ }
+ this.out = out;
+ this.firstTxn = true;
+ }
+
+ /**
+ * turns value output on
+ */
+ public void enableValues() {
+ outputValues = true;
+ }
+
+ /**
+ * turns value output off
+ */
+ public void disableValues() {
+ outputValues = false;
+ }
+
+ /**
+ * turns JSON output on
+ */
+ public void enableJSON() {
+ outputJSON = true;
+ }
+
+ /**
+ * turns JSON output off, and turns on "pretty strings" for human consumption
+ */
+ public void disableJSON() {
+ outputJSON = false;
+ }
+
+ /**
+ * sets the region by which output will be filtered
+ *
+ * @param sequence
+ * when nonnegative, serves as a filter; only log entries with this
+ * sequence id will be printed
+ */
+ public void setSequenceFilter(long sequence) {
+ this.sequence = sequence;
+ }
+
+ /**
+ * sets the region by which output will be filtered
+ *
+ * @param region
+ * when not null, serves as a filter; only log entries from this
+ * region will be printed
+ */
+ public void setRegionFilter(String region) {
+ this.region = region;
+ }
+
+ /**
+ * sets the region by which output will be filtered
+ *
+ * @param row
+ * when not null, serves as a filter; only log entries from this row
+ * will be printed
+ */
+ public void setRowFilter(String row) {
+ this.row = row;
+ }
+
+ /**
+ * enables output as a single, persistent list. at present, only relevant in
+ * the case of JSON output.
+ */
+ public void beginPersistentOutput() {
+ if (persistentOutput)
+ return;
+ persistentOutput = true;
+ firstTxn = true;
+ if (outputJSON)
+ out.print("[");
+ }
+
+ /**
+ * ends output of a single, persistent list. at present, only relevant in the
+ * case of JSON output.
+ */
+ public void endPersistentOutput() {
+ if (!persistentOutput)
+ return;
+ persistentOutput = false;
+ if (outputJSON)
+ out.print("]");
+ }
+
+ /**
+ * reads a log file and outputs its contents, one transaction at a time, as
+ * specified by the currently configured options
+ *
+ * @param conf
+ * the HBase configuration relevant to this log file
+ * @param p
+ * the path of the log file to be read
+ * @throws IOException
+ * may be unable to access the configured filesystem or requested
+ * file.
+ */
+ public void processFile(final Configuration conf, final Path p)
+ throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ if (!fs.exists(p)) {
+ throw new FileNotFoundException(p.toString());
+ }
+ if (!fs.isFile(p)) {
+ throw new IOException(p + " is not a file");
+ }
+ if (outputJSON && !persistentOutput) {
+ out.print("[");
+ firstTxn = true;
+ }
+ WAL.Reader log = WALFactory.createReader(fs, p, conf);
+ try {
+ WAL.Entry entry;
+ while ((entry = log.next()) != null) {
+ WALKey key = entry.getKey();
+ WALEdit edit = entry.getEdit();
+ // begin building a transaction structure
+ Map<String, Object> txn = key.toStringMap();
+ long writeTime = key.getWriteTime();
+ // check output filters
+ if (sequence >= 0 && ((Long) txn.get("sequence")) != sequence)
+ continue;
+ if (region != null && !((String) txn.get("region")).equals(region))
+ continue;
+ // initialize list into which we will store atomic actions
+ List<Map> actions = new ArrayList<Map>();
+ for (Cell cell : edit.getCells()) {
+ // add atomic operation to txn
+ Map<String, Object> op = new HashMap<String, Object>(toStringMap(cell));
+ if (outputValues) op.put("value", Bytes.toStringBinary(cell.getValue()));
+ // check row output filter
+ if (row == null || ((String) op.get("row")).equals(row))
+ actions.add(op);
+ }
+ if (actions.size() == 0)
+ continue;
+ txn.put("actions", actions);
+ if (outputJSON) {
+ // JSON output is a straightforward "toString" on the txn object
+ if (firstTxn)
+ firstTxn = false;
+ else
+ out.print(",");
+ // encode and print JSON
+ out.print(MAPPER.writeValueAsString(txn));
+ } else {
+ // Pretty output, complete with indentation by atomic action
+ out.println("Sequence " + txn.get("sequence") + " "
+ + "from region " + txn.get("region") + " " + "in table "
+ + txn.get("table") + " at write timestamp: " + new Date(writeTime));
+ for (int i = 0; i < actions.size(); i++) {
+ Map op = actions.get(i);
+ out.println(" Action:");
+ out.println(" row: " + op.get("row"));
+ out.println(" column: " + op.get("family") + ":"
+ + op.get("qualifier"));
+ out.println(" timestamp: "
+ + (new Date((Long) op.get("timestamp"))));
+ if(op.get("tag") != null) {
+ out.println(" tag: " + op.get("tag"));
+ }
+ if (outputValues)
+ out.println(" value: " + op.get("value"));
+ }
+ }
+ }
+ } finally {
+ log.close();
+ }
+ if (outputJSON && !persistentOutput) {
+ out.print("]");
+ }
+ }
+
+ private static Map<String, Object> toStringMap(Cell cell) {
+ Map<String, Object> stringMap = new HashMap<String, Object>();
+ stringMap.put("row",
+ Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+ stringMap.put("family", Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength()));
+ stringMap.put("qualifier",
+ Bytes.toStringBinary(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength()));
+ stringMap.put("timestamp", cell.getTimestamp());
+ stringMap.put("vlen", cell.getValueLength());
+ if (cell.getTagsLength() > 0) {
+ List<String> tagsString = new ArrayList<String>();
+ Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ while (tagsIterator.hasNext()) {
+ Tag tag = tagsIterator.next();
+ tagsString.add((tag.getType()) + ":"
+ + Bytes.toStringBinary(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()));
+ }
+ stringMap.put("tag", tagsString);
+ }
+ return stringMap;
+ }
+
+ public static void main(String[] args) throws IOException {
+ run(args);
+ }
+
+ /**
+ * Pass one or more log file names and formatting options and it will dump out
+ * a text version of the contents on <code>stdout</code>.
+ *
+ * @param args
+ * Command line arguments
+ * @throws IOException
+ * Thrown upon file system errors etc.
+ * @throws ParseException
+ * Thrown if command-line parsing fails.
+ */
+ public static void run(String[] args) throws IOException {
+ // create options
+ Options options = new Options();
+ options.addOption("h", "help", false, "Output help message");
+ options.addOption("j", "json", false, "Output JSON");
+ options.addOption("p", "printvals", false, "Print values");
+ options.addOption("r", "region", true,
+ "Region to filter by. Pass region name; e.g. 'hbase:meta,,1'");
+ options.addOption("s", "sequence", true,
+ "Sequence to filter by. Pass sequence number.");
+ options.addOption("w", "row", true, "Row to filter by. Pass row name.");
+
+ WALPrettyPrinter printer = new WALPrettyPrinter();
+ CommandLineParser parser = new PosixParser();
+ List files = null;
+ try {
+ CommandLine cmd = parser.parse(options, args);
+ files = cmd.getArgList();
+ if (files.size() == 0 || cmd.hasOption("h")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("WAL <filename...>", options, true);
+ System.exit(-1);
+ }
+ // configure the pretty printer using command line options
+ if (cmd.hasOption("p"))
+ printer.enableValues();
+ if (cmd.hasOption("j"))
+ printer.enableJSON();
+ if (cmd.hasOption("r"))
+ printer.setRegionFilter(cmd.getOptionValue("r"));
+ if (cmd.hasOption("s"))
+ printer.setSequenceFilter(Long.parseLong(cmd.getOptionValue("s")));
+ if (cmd.hasOption("w"))
+ printer.setRowFilter(cmd.getOptionValue("w"));
+ } catch (ParseException e) {
+ e.printStackTrace();
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("HFile filename(s) ", options, true);
+ System.exit(-1);
+ }
+ // get configuration, file system, and process the given files
+ Configuration conf = HBaseConfiguration.create();
+ FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
+
+ // begin output
+ printer.beginPersistentOutput();
+ for (Object f : files) {
+ Path file = new Path((String) f);
+ FileSystem fs = file.getFileSystem(conf);
+ if (!fs.exists(file)) {
+ System.err.println("ERROR, file doesnt exist: " + file);
+ return;
+ }
+ printer.processFile(conf, file);
+ }
+ printer.endPersistentOutput();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
new file mode 100644
index 0000000..b27abf9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+
+/**
+ * The Write Ahead Log (WAL) stores all durable edits to the HRegion.
+ * This interface provides the entry point for all WAL implementors.
+ * <p>
+ * See {@link DefaultWALProvider} for an example implementation.
+ *
+ * A single WALProvider will be used for retrieving multiple WALs in a particular region server
+ * and must be threadsafe.
+ */
+@InterfaceAudience.Private
+public interface WALProvider {
+
+ /**
+ * Set up the provider to create wals.
+ * will only be called once per instance.
+ * @param factory factory that made us may not be null
+ * @param conf may not be null
+ * @param listeners may be null
+ * @param providerId differentiate between providers from one factory. may be null
+ */
+ void init(final WALFactory factory, final Configuration conf,
+ final List<WALActionsListener> listeners, final String providerId) throws IOException;
+
+ /**
+ * @param identifier may not be null. contents will not be altered.
+ * @return a WAL for writing entries for the given region.
+ */
+ WAL getWAL(final byte[] identifier) throws IOException;
+
+ /**
+ * persist outstanding WALs to storage and stop accepting new appends.
+ * This method serves as shorthand for sending a sync to every WAL provided by a given
+ * implementation. Those WALs will also stop accepting new writes.
+ */
+ void shutdown() throws IOException;
+
+ /**
+ * shutdown utstanding WALs and clean up any persisted state.
+ * Call this method only when you will not need to replay any of the edits to the WALs from
+ * this provider. After this call completes, the underlying resources should have been reclaimed.
+ */
+ void close() throws IOException;
+
+ // Writers are used internally. Users outside of the WAL should be relying on the
+ // interface provided by WAL.
+ interface Writer extends Closeable {
+ void sync() throws IOException;
+ void append(WAL.Entry entry) throws IOException;
+ long getLength() throws IOException;
+ }
+
+}