You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:06 UTC
[20/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java
new file mode 100644
index 0000000..5ba20d2
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java
@@ -0,0 +1,853 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.gemstone.gemfire.internal.hll.HyperLogLog;
+import com.gemstone.gemfire.internal.hll.ICardinality;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.ShutdownHookManager;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.DelegatingSerializedComparator;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics.ScanOperation;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.util.Hex;
+import com.gemstone.gemfire.internal.util.SingletonValue;
+import com.gemstone.gemfire.internal.util.SingletonValue.SingletonBuilder;
+
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.apache.hadoop.hbase.util.BloomFilterWriter;
+
+/**
+ * Implements hfile based {@link Hoplog}
+ */
+public final class HFileSortedOplog extends AbstractHoplog {
+
+// private static final boolean CACHE_DATA_BLOCKS_ON_READ = !Boolean.getBoolean("gemfire.HFileSortedOplog.DISABLE_CACHE_ON_READ");
+ private final CacheConfig cacheConf;
+ private ICardinality entryCountEstimate;
+
+ // a cached reader for the file
+ private final SingletonValue<HFileReader> reader;
+
+ public HFileSortedOplog(HDFSStoreImpl store, Path hfilePath,
+ BlockCache blockCache, SortedOplogStatistics stats,
+ HFileStoreStatistics storeStats) throws IOException {
+ super(store, hfilePath, stats);
+ cacheConf = getCacheConfInstance(blockCache, stats, storeStats);
+ reader = getReaderContainer();
+ }
+
+ /**
+ * THIS METHOD SHOULD BE USED FOR LONER ONLY
+ */
+ public static HFileSortedOplog getHoplogForLoner(FileSystem inputFS,
+ Path hfilePath) throws IOException {
+ return new HFileSortedOplog(inputFS, hfilePath, null, null, null);
+ }
+
+ private HFileSortedOplog(FileSystem inputFS, Path hfilePath,
+ BlockCache blockCache, SortedOplogStatistics stats,
+ HFileStoreStatistics storeStats) throws IOException {
+ super(inputFS, hfilePath, stats);
+ cacheConf = getCacheConfInstance(blockCache, stats, storeStats);
+ reader = getReaderContainer();
+ }
+
+ protected CacheConfig getCacheConfInstance(BlockCache blockCache,
+ SortedOplogStatistics stats, HFileStoreStatistics storeStats) {
+ CacheConfig tmpConfig = null;
+// if (stats == null) {
+ tmpConfig = new CacheConfig(conf);
+// } else {
+// tmpConfig = new CacheConfig(conf, CACHE_DATA_BLOCKS_ON_READ, blockCache,
+// HFileSortedOplogFactory.convertStatistics(stats, storeStats));
+// }
+ tmpConfig.shouldCacheBlockOnRead(BlockCategory.ALL_CATEGORIES);
+ return tmpConfig;
+ }
+
+ private SingletonValue<HFileReader> getReaderContainer() {
+ return new SingletonValue<HFileReader>(new SingletonBuilder<HFileReader>() {
+ @Override
+ public HFileReader create() throws IOException {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Creating hoplog reader", logPrefix);
+ return new HFileReader();
+ }
+
+ @Override
+ public void postCreate() {
+ if (readerListener != null) {
+ readerListener.readerCreated();
+ }
+ }
+
+ @Override
+ public void createInProgress() {
+ }
+ });
+ }
+
+ @Override
+ public HoplogReader getReader() throws IOException {
+ return reader.get();
+ }
+
+ @Override
+ public ICardinality getEntryCountEstimate() throws IOException {
+ ICardinality result = entryCountEstimate;
+ if (result == null) {
+ HoplogReader rdr = getReader(); // keep this out of the critical section
+ synchronized(this) {
+ result = entryCountEstimate;
+ if (result == null) {
+ entryCountEstimate = result = rdr.getCardinalityEstimator();
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public HoplogWriter createWriter(int keys) throws IOException {
+ return new HFileSortedOplogWriter(keys);
+ }
+
+ @Override
+ public boolean isClosed() {
+ HFileReader rdr = reader.getCachedValue();
+ return rdr == null || rdr.isClosed();
+ }
+
+ @Override
+ public void close() throws IOException {
+ close(true);
+ }
+
+ @Override
+ public void close(boolean clearCache) throws IOException {
+ compareAndClose(null, clearCache);
+ }
+
+ private void compareAndClose(HFileReader hfileReader, boolean clearCache) throws IOException {
+ HFileReader rdr ;
+ if (hfileReader == null) {
+ rdr = reader.clear(true);
+ } else {
+ boolean result = reader.clear(hfileReader, true);
+ if (! result) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}skipping close, provided hfileReader mismatched", logPrefix);
+ return;
+ }
+ rdr = hfileReader;
+ }
+
+ if (rdr != null) {
+ try {
+ rdr.close(clearCache);
+ } finally {
+ if (readerListener != null) {
+ readerListener.readerClosed();
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "HFileSortedOplog[" + getFileName() + "]";
+ }
+
+ private class HFileSortedOplogWriter implements HoplogWriter {
+ private final Writer writer;
+ private final BloomFilterWriter bfw;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ public HFileSortedOplogWriter(int keys) throws IOException {
+ try {
+ int hfileBlockSize = Integer.getInteger(
+ HoplogConfig.HFILE_BLOCK_SIZE_CONF, (1 << 16));
+
+ Algorithm compress = Algorithm.valueOf(System.getProperty(HoplogConfig.COMPRESSION,
+ HoplogConfig.COMPRESSION_DEFAULT));
+
+// ByteComparator bc = new ByteComparator();
+ writer = HFile.getWriterFactory(conf, cacheConf)
+ .withPath(fsProvider.getFS(), path)
+ .withBlockSize(hfileBlockSize)
+// .withComparator(bc)
+ .withCompression(compress)
+ .create();
+// bfw = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, BloomType.ROW, keys,
+// writer, bc);
+ bfw = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, BloomType.ROW, keys,
+ writer);
+
+ if (logger.isDebugEnabled())
+ logger.debug("{}Created hoplog writer with compression " + compress, logPrefix);
+ } catch (IOException e) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}IO Error while creating writer", logPrefix);
+ throw e;
+ }
+ }
+
+ @Override
+ public void append(byte[] key, byte[] value) throws IOException {
+ writer.append(key, value);
+ bfw.add(key, 0, key.length);
+ }
+
+ @Override
+ public void append(ByteBuffer key, ByteBuffer value) throws IOException {
+ byte[] keyBytes = byteBufferToArray(key);
+ byte[] valueBytes = byteBufferToArray(value);
+ writer.append(keyBytes, valueBytes);
+ bfw.add(keyBytes, 0, keyBytes.length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ close(null);
+ }
+
+ @Override
+ public void close(EnumMap<Meta, byte[]> metadata) throws IOException {
+ if (closed.get()) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Writer already closed", logPrefix);
+ return;
+ }
+
+ bfw.compactBloom();
+ writer.addGeneralBloomFilter(bfw);
+
+ // append system metadata
+ writer.appendFileInfo(Meta.GEMFIRE_MAGIC.toBytes(), Hoplog.MAGIC);
+ writer.appendFileInfo(Meta.SORTED_OPLOG_VERSION.toBytes(), HoplogVersion.V1.toBytes());
+ writer.appendFileInfo(Meta.GEMFIRE_VERSION.toBytes(), Version.CURRENT.toBytes());
+
+ // append comparator info
+// if (writer.getComparator() instanceof DelegatingSerializedComparator) {
+// ByteArrayOutputStream bos = new ByteArrayOutputStream();
+// DataOutput out = new DataOutputStream(bos);
+//
+// writeComparatorInfo(out, ((DelegatingSerializedComparator) writer.getComparator()).getComparators());
+// writer.appendFileInfo(Meta.COMPARATORS.toBytes(), bos.toByteArray());
+// }
+
+ // append user metadata
+ HyperLogLog cachedEntryCountEstimate = null;
+ if (metadata != null) {
+ for (Entry<Meta, byte[]> entry : metadata.entrySet()) {
+ writer.appendFileInfo(entry.getKey().toBytes(), entry.getValue());
+ if (Meta.LOCAL_CARDINALITY_ESTIMATE_V2.equals(entry.getKey())) {
+ cachedEntryCountEstimate = HyperLogLog.Builder.build(entry.getValue());
+ }
+ }
+ }
+
+ writer.close();
+ if (logger.isDebugEnabled())
+ logger.debug("{}Completed closing writer", logPrefix);
+ closed.set(true);
+ // cache estimate value to avoid reads later
+ entryCountEstimate = cachedEntryCountEstimate;
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ throw new UnsupportedOperationException("hsync is not supported for HFiles");
+ }
+
+ @Override
+ public long getCurrentSize() throws IOException {
+ throw new UnsupportedOperationException("getCurrentSize is not supported for HFiles");
+ }
+
+// private void writeComparatorInfo(DataOutput out, SerializedComparator[] comparators) throws IOException {
+// out.writeInt(comparators.length);
+// for (SerializedComparator sc : comparators) {
+// out.writeUTF(sc.getClass().getName());
+// if (sc instanceof DelegatingSerializedComparator) {
+// writeComparatorInfo(out, ((DelegatingSerializedComparator) sc).getComparators());
+// }
+// }
+// }
+ }
+
+ private void handleReadIOError(HFileReader hfileReader, IOException e, boolean skipFailIfSafe) {
+ if (logger.isDebugEnabled())
+ logger.debug("Read IO error", e);
+ boolean safeError = ShutdownHookManager.get().isShutdownInProgress();
+ if (safeError) {
+ // IOException because of closed file system. This happens when member is
+ // shutting down
+ if (logger.isDebugEnabled())
+ logger.debug("IO error caused by filesystem shutdown", e);
+ throw new CacheClosedException("IO error caused by filesystem shutdown", e);
+ }
+
+ // expose the error wrapped inside remote exception. Remote exceptions are
+ // handled by file system client. So let the caller handle this error
+ if (e instanceof RemoteException) {
+ e = ((RemoteException) e).unwrapRemoteException();
+ throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path), e);
+ }
+
+ FileSystem currentFs = fsProvider.checkFileSystem();
+ if (hfileReader != null && hfileReader.previousFS != currentFs) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Detected new FS client, closing old reader", logPrefix);
+ if (currentFs != null) {
+ if (logger.isDebugEnabled())
+ logger.debug("CurrentFs:" + currentFs.getUri() + "-"
+ + currentFs.hashCode(), logPrefix);
+ }
+ if (hfileReader.previousFS != null) {
+ if (logger.isDebugEnabled())
+ logger.debug("OldFs:" + hfileReader.previousFS.getUri() + "-"
+ + hfileReader.previousFS.hashCode() + ", closing old reader", logPrefix);
+ }
+ }
+ try {
+ HFileSortedOplog.this.compareAndClose(hfileReader, false);
+ } catch (Exception ex) {
+ if (logger.isDebugEnabled())
+ logger.debug("Failed to close reader", ex);
+ }
+ if (skipFailIfSafe) {
+ if (logger.isDebugEnabled())
+ logger.debug("Not faling after io error since FS client changed");
+ return;
+ }
+ }
+
+ // it is not a safe error. let the caller handle it
+ throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path), e);
+ }
+
+ class HFileReader implements HoplogReader, Closeable {
+ private final Reader reader;
+ private volatile BloomFilter hoplogBloom;
+ private final AtomicBoolean closed;
+ private final Map<byte[], byte[]> fileInfo;
+ private final HyperLogLog estimator;
+ private final FileSystem previousFS;
+
+ public HFileReader() throws IOException {
+ try {
+ FileSystem fs = fsProvider.getFS();
+ reader = HFile.createReader(fs, path, cacheConf);
+ fileInfo = reader.loadFileInfo();
+ closed = new AtomicBoolean(false);
+
+ validate();
+ if (reader.getComparator() instanceof DelegatingSerializedComparator) {
+ loadComparators((DelegatingSerializedComparator) reader.getComparator());
+ }
+
+ // read the old HLL if it exists so that a CardinalityMergeException will trigger a Major Compaction
+ byte[] hll = fileInfo.get(Meta.LOCAL_CARDINALITY_ESTIMATE.toBytes());
+ if (hll != null) {
+ entryCountEstimate = estimator = HyperLogLog.Builder.build(hll);
+ } else if ((hll = fileInfo.get(Meta.LOCAL_CARDINALITY_ESTIMATE_V2.toBytes())) != null) {
+ entryCountEstimate = estimator = HyperLogLog.Builder.build(hll);
+ } else {
+ estimator = new HyperLogLog(HdfsSortedOplogOrganizer.HLL_CONSTANT);
+ }
+
+ previousFS = fs;
+ } catch (IOException e) {
+ if (logger.isDebugEnabled())
+ logger.debug("IO Error while creating reader", e);
+ throw e;
+ }
+ }
+
+ @Override
+ public byte[] read(byte[] key) throws IOException {
+ IOException err = null;
+ HFileReader delegateReader = this;
+ for (int retry = 1; retry >= 0; retry --) {
+ try {
+ return delegateReader.readDelegate(key);
+ } catch (IOException e) {
+ err = e;
+ handleReadIOError(delegateReader, e, retry > 0);
+ // Current reader may have got closed in error handling. Get the new
+ // one for retry attempt
+ try {
+ delegateReader = (HFileReader) HFileSortedOplog.this.getReader();
+ } catch (IOException ex) {
+ handleReadIOError(null, e, false);
+ }
+ }
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("Throwing err from read delegate ", err);
+ throw err;
+ }
+
+ private byte[] readDelegate(byte[] key) throws IOException {
+ try {
+ if (!getBloomFilter().mightContain(key)) {
+ // bloom filter check failed, the key is not present in this hoplog
+ return null;
+ }
+ } catch (IllegalArgumentException e) {
+ if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
+ throw (IOException) e.getCause();
+ } else {
+ throw e;
+ }
+ }
+
+ byte[] valueBytes = null;
+ ByteBuffer bb = get(key);
+ if (bb != null) {
+ valueBytes = new byte[bb.remaining()];
+ bb.get(valueBytes);
+ } else {
+ stats.getBloom().falsePositive();
+ }
+ return valueBytes;
+ }
+
+ @Override
+ public ByteBuffer get(byte[] key) throws IOException {
+ assert key != null;
+ HFileScanner seek = reader.getScanner(false, true);
+ if (seek.seekTo(key) == 0) {
+ return seek.getValue();
+ }
+ return null;
+ }
+
+ @Override
+ public HoplogIterator<byte[], byte[]> scan(byte[] from, boolean fromInclusive, byte[] to,
+ boolean toInclusive) throws IOException {
+ IOException err = null;
+ HFileReader delegateReader = this;
+ for (int retry = 1; retry >= 0; retry --) {
+ try {
+ return delegateReader.scanDelegate(from, fromInclusive, to, toInclusive);
+ } catch (IOException e) {
+ err = e;
+ handleReadIOError(delegateReader, e, retry > 0);
+ // Current reader may have got closed in error handling. Get the new
+ // one for retry attempt
+ try {
+ delegateReader = (HFileReader) HFileSortedOplog.this.getReader();
+ } catch (IOException ex) {
+ handleReadIOError(null, e, false);
+ }
+ }
+ }
+ if (logger.isDebugEnabled())
+ logger.debug("Throwing err from scan delegate ", err);
+ throw err;
+ }
+
+ private HoplogIterator<byte[], byte[]> scanDelegate(byte[] from, boolean fromInclusive, byte[] to,
+ boolean toInclusive) throws IOException {
+ return new HFileSortedIterator(reader.getScanner(true, false), from,
+ fromInclusive, to, toInclusive);
+ }
+
+ @Override
+ public HoplogIterator<byte[], byte[]> scan(long offset, long length)
+ throws IOException {
+ /**
+ * Identifies the first and last key to be scanned based on offset and
+ * length. It loads hfile block index and identifies the first hfile block
+ * starting after offset. The key of that block is from key for scanner.
+ * Similarly it locates first block starting beyond offset + length range.
+ * It uses key of that block as the to key for scanner
+ */
+
+ // load block indexes in memory
+ BlockIndexReader bir = reader.getDataBlockIndexReader();
+ int blockCount = bir.getRootBlockCount();
+
+ byte[] fromKey = null, toKey = null;
+
+ // find from key
+ int i = 0;
+ for (; i < blockCount; i++) {
+ if (bir.getRootBlockOffset(i) < offset) {
+ // hfile block has offset less than this reader's split offset. check
+ // the next block
+ continue;
+ }
+
+ // found the first hfile block starting after offset
+ fromKey = bir.getRootBlockKey(i);
+ break;
+ }
+
+ if (fromKey == null) {
+ // seems no block starts after the offset. return no-op scanner
+ return new HFileSortedIterator(null, null, false, null, false);
+ }
+
+ // find to key
+ for (; i < blockCount; i++) {
+ if (bir.getRootBlockOffset(i) < (offset + length)) {
+ // this hfile block lies within the offset+lenght range. check the
+ // next block for a higher offset
+ continue;
+ }
+
+ // found the first block starting beyong offset+length range.
+ toKey = bir.getRootBlockKey(i);
+ break;
+ }
+
+ // from key is included in scan and to key is excluded
+ HFileScanner scanner = reader.getScanner(true, false);
+ return new HFileSortedIterator(scanner, fromKey, true, toKey, false);
+ }
+
+ @Override
+ public HoplogIterator<byte[], byte[]> scan() throws IOException {
+ return scan(null, null);
+ }
+
+ public HoplogIterator<byte[], byte[]> scan(byte[] from, byte[] to)
+ throws IOException {
+ return scan(from, true, to, false);
+ }
+
+ @Override
+ public BloomFilter getBloomFilter() throws IOException {
+ BloomFilter result = hoplogBloom;
+ if (result == null) {
+ synchronized (this) {
+ result = hoplogBloom;
+ if (result == null) {
+ hoplogBloom = result = new BloomFilterImpl();
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ @Override
+ public void close() throws IOException {
+ close(true);
+ }
+
+ public void close(boolean clearCache) throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Closing reader", logPrefix);
+ reader.close(clearCache);
+ }
+ }
+
+ @Override
+ public long getEntryCount() {
+ return reader.getEntries();
+ }
+
+ public ICardinality getCardinalityEstimator() {
+ return estimator;
+ }
+
+ @Override
+ public long sizeEstimate() {
+ return getCardinalityEstimator().cardinality();
+ }
+
+ private void validate() throws IOException {
+ // check magic
+ byte[] magic = fileInfo.get(Meta.GEMFIRE_MAGIC.toBytes());
+ if (!Arrays.equals(magic, MAGIC)) {
+ throw new IOException(LocalizedStrings.Soplog_INVALID_MAGIC.toLocalizedString(Hex.toHex(magic)));
+ }
+
+ // check version compatibility
+ byte[] ver = fileInfo.get(Meta.SORTED_OPLOG_VERSION.toBytes());
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Hoplog version is " + Hex.toHex(ver), logPrefix);
+ }
+
+ if (!Arrays.equals(ver, HoplogVersion.V1.toBytes())) {
+ throw new IOException(LocalizedStrings.Soplog_UNRECOGNIZED_VERSION.toLocalizedString(Hex.toHex(ver)));
+ }
+ }
+
+ private void loadComparators(DelegatingSerializedComparator comparator) throws IOException {
+ byte[] raw = fileInfo.get(Meta.COMPARATORS.toBytes());
+ assert raw != null;
+
+ DataInput in = new DataInputStream(new ByteArrayInputStream(raw));
+ comparator.setComparators(readComparators(in));
+ }
+
+ private SerializedComparator[] readComparators(DataInput in) throws IOException {
+ try {
+ SerializedComparator[] comps = new SerializedComparator[in.readInt()];
+ assert comps.length > 0;
+
+ for (int i = 0; i < comps.length; i++) {
+ comps[i] = (SerializedComparator) Class.forName(in.readUTF()).newInstance();
+ if (comps[i] instanceof DelegatingSerializedComparator) {
+ ((DelegatingSerializedComparator) comps[i]).setComparators(readComparators(in));
+ }
+ }
+ return comps;
+
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ class BloomFilterImpl implements BloomFilter {
+ private final org.apache.hadoop.hbase.util.BloomFilter hfileBloom;
+
+ public BloomFilterImpl() throws IOException {
+ DataInput bin = reader.getGeneralBloomFilterMetadata();
+ // instantiate bloom filter if meta present in hfile
+ if (bin != null) {
+ hfileBloom = BloomFilterFactory.createFromMeta(bin, reader);
+ if (reader.getComparator() instanceof DelegatingSerializedComparator) {
+ loadComparators((DelegatingSerializedComparator) hfileBloom.getComparator());
+ }
+ } else {
+ hfileBloom = null;
+ }
+ }
+
+ @Override
+ public boolean mightContain(byte[] key) {
+ assert key != null;
+ return mightContain(key, 0, key.length);
+ }
+
+ @Override
+ public boolean mightContain(byte[] key, int keyOffset, int keyLength) {
+ assert key != null;
+ long start = stats.getBloom().begin();
+ boolean found = hfileBloom == null ? true : hfileBloom.contains(key, keyOffset, keyLength, null);
+ stats.getBloom().end(start);
+ return found;
+ }
+
+ @Override
+ public long getBloomSize() {
+ return hfileBloom == null ? 0 : hfileBloom.getByteSize();
+ }
+ }
+
+ // TODO change the KV types to ByteBuffer instead of byte[]
+ public final class HFileSortedIterator implements HoplogIterator<byte[], byte[]> {
+ private final HFileScanner scan;
+
+ private final byte[] from;
+ private final boolean fromInclusive;
+
+ private final byte[] to;
+ private final boolean toInclusive;
+
+ private ByteBuffer prefetchedKey;
+ private ByteBuffer prefetchedValue;
+ private ByteBuffer currentKey;
+ private ByteBuffer currentValue;
+
+ // variable linked to scan stats
+ ScanOperation scanStat;
+ private long scanStart;
+
+ public HFileSortedIterator(HFileScanner scan, byte[] from, boolean fromInclusive, byte[] to,
+ boolean toInclusive) throws IOException {
+ this.scan = scan;
+ this.from = from;
+ this.fromInclusive = fromInclusive;
+ this.to = to;
+ this.toInclusive = toInclusive;
+
+ scanStat = (stats == null) ? new SortedOplogStatistics("", "").new ScanOperation(
+ 0, 0, 0, 0, 0, 0, 0) : stats.getScan();
+ scanStart = scanStat.begin();
+
+ if (scan == null) {
+ return;
+ }
+
+ assert from == null || to == null
+ || scan.getReader().getComparator().compare(from, to) <= 0;
+
+ initIterator();
+ }
+
+ /*
+ * prefetches first key and value from the file for hasnext to work
+ */
+ private void initIterator() throws IOException {
+ long startNext = scanStat.beginIteration();
+ boolean scanSuccessful = true;
+ if (from == null) {
+ scanSuccessful = scan.seekTo();
+ } else {
+ int compare = scan.seekTo(from);
+ if (compare == 0 && !fromInclusive || compare > 0) {
+ // as from in exclusive and first key is same as from, skip the first key
+ scanSuccessful = scan.next();
+ }
+ }
+
+ populateKV(startNext, scanSuccessful);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return prefetchedKey != null;
+ }
+
+ @Override
+ public byte[] next() throws IOException {
+ return byteBufferToArray(nextBB());
+ }
+
+ public ByteBuffer nextBB() throws IOException {
+ long startNext = scanStat.beginIteration();
+ if (prefetchedKey == null) {
+ throw new NoSuchElementException();
+ }
+
+ currentKey = prefetchedKey;
+ currentValue = prefetchedValue;
+
+ prefetchedKey = null;
+ prefetchedValue = null;
+
+ if (scan.next()) {
+ populateKV(startNext, true);
+ }
+
+ return currentKey;
+ }
+
+
+ private void populateKV(long nextStartTime, boolean scanSuccessful) {
+ if (!scanSuccessful) {
+ //end of file reached. collect stats and return
+ scanStat.endIteration(0, nextStartTime);
+ return;
+ }
+
+ prefetchedKey = scan.getKey();
+ prefetchedValue = scan.getValue();
+
+ if (to != null) {
+ // TODO Optimization? Perform int comparison instead of byte[]. Identify
+ // offset of key greater than two.
+ int compare = -1;
+ compare = scan.getReader().getComparator().compare
+ (prefetchedKey.array(), prefetchedKey.arrayOffset(), prefetchedKey.remaining(), to, 0, to.length);
+ if (compare > 0 || (compare == 0 && !toInclusive)) {
+ prefetchedKey = null;
+ prefetchedValue = null;
+ return;
+ }
+ }
+
+ // account for bytes read and time spent
+ int byteCount = prefetchedKey.remaining() + prefetchedValue.remaining();
+ scanStat.endIteration(byteCount, nextStartTime);
+ }
+
+
+ @Override
+ public byte[] getKey() {
+ return byteBufferToArray(getKeyBB());
+ }
+ public ByteBuffer getKeyBB() {
+ return currentKey;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return byteBufferToArray(getValueBB());
+ }
+ public ByteBuffer getValueBB() {
+ return currentValue;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Cannot delete a key-value from a hfile sorted oplog");
+ }
+
+ @Override
+ public void close() {
+ scanStat.end(scanStart);
+ }
+ }
+ }
+
+ public static byte[] byteBufferToArray(ByteBuffer bb) {
+ if (bb == null) {
+ return null;
+ }
+
+ byte[] tmp = new byte[bb.remaining()];
+ bb.duplicate().get(tmp);
+ return tmp;
+ }
+}