You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/11/17 21:54:50 UTC
[40/50] [abbrv] incubator-geode git commit: GEODE-544: Removes soplog
code and tests
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java
deleted file mode 100644
index 73175e7..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import com.gemstone.gemfire.internal.DSCODE;
-import com.gemstone.gemfire.internal.cache.EntryBits;
-
-/**
- * Defines serialized tokens for soplogs.
- */
-public enum SoplogToken {
-
- /** indicates the serialized value is a wildcard compares equal to any other key */
- WILDCARD( DSCODE.WILDCARD ),
-
- /** indicates the serialized value is a tombstone of a deleted key */
- TOMBSTONE( EntryBits.setTombstone((byte)0, true) ),
-
- /** indicates the serialized value is a invalid token*/
- INVALID( EntryBits.setInvalid((byte)0, true) ),
-
- /** indicates the serialized tombstone has been garbage collected*/
- REMOVED_PHASE2( EntryBits.setLocalInvalid((byte)0, true) ),
-
- /** indicates the value is serialized */
- SERIALIZED( EntryBits.setSerialized((byte)0, true) );
-
- /** the serialized form of the token */
- private final byte val;
-
- private SoplogToken(byte val) {
- this.val = val;
- }
-
- @Override
- public String toString() {
- return super.toString()+" byte:"+val;
- }
-
- /**
- * Returns the serialized form of the token.
- * @return the byte
- */
- public byte toByte() {
- return val;
- }
-
- /**
- * Returns true if either of the serialized objects is a wildcard.
- *
- * @param b1 the first object
- * @param off1 the first offset
- * @param b2 the second object
- * @param off2 the second object
- * @return true if a wildcard
- */
- public static boolean isWildcard(byte[] b1, int off1, byte[] b2, int off2) {
- return b1[off1] == DSCODE.WILDCARD || b2[off2] == DSCODE.WILDCARD;
- }
-
- /**
- * Returns true if the serialized object is a tombstone.
- *
- * @param b the magic entry type byte
- * @return true if a tombstone
- */
- public static boolean isTombstone(byte b) {
- return EntryBits.isTombstone(b);
- }
-
- /**
- * Returns true if the serialized object is an invalid token.
- *
- * @param b the magic entry type byte
- * @return true if invalid
- */
- public static boolean isInvalid(byte b) {
- return EntryBits.isInvalid(b);
- }
-
- /**
- * Returns true if the serialized tombstone was garbage collected
- *
- * @param b the magic entry type byte
- * @return true if RemovedPhase2
- */
- public static boolean isRemovedPhase2(byte b) {
- return EntryBits.isLocalInvalid(b);
- }
-
- /**
- * Returns true if the serialized object is not any token
- *
- *@param b the magic entry type byte
- * @return true if not any token
- */
- public static boolean isSerialized(byte b) {
- return EntryBits.isSerialized(b);
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java
deleted file mode 100644
index b301ac5..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * Provides an in-memory buffer to temporarily hold key/value pairs until they
- * can be flushed to disk. Each buffer instance can be optionally associated
- * with a user-specified tag for identification purposes.
- *
- * @param <T> the tag type
- * @author bakera
- */
-public class SortedBuffer<T> extends AbstractSortedReader {
- private static final Logger logger = LogService.getLogger();
-
- /** the tag */
- private final T tag;
-
- /** in-memory sorted key/vaue buffer */
- private final NavigableMap<byte[], byte[]> buffer;
-
- /** the stats */
- private final BufferStats stats;
-
- /** the metadata, set during flush */
- private final EnumMap<Metadata, byte[]> metadata;
-
- /** the command to run (or defer) when the flush is complete */
- private Runnable flushAction;
-
- private final String logPrefix;
-
- public SortedBuffer(SortedOplogConfiguration config, T tag) {
- assert config != null;
- assert tag != null;
-
- this.tag = tag;
-
- buffer = new ConcurrentSkipListMap<byte[], byte[]>(config.getComparator());
- stats = new BufferStats();
- metadata = new EnumMap<Metadata, byte[]>(Metadata.class);
-
- this.logPrefix = "<" + config.getName() + "#" + tag + "> ";
- }
-
- /**
- * Returns the tag associated with the buffer.
- * @return the tag
- */
- public T getTag() {
- return tag;
- }
-
- @Override
- public String toString() {
- return logger.getName() + this.logPrefix;
- }
-
- /**
- * Adds a new value to the buffer.
- * @param key the key
- * @param value the value
- */
- public void put(byte[] key, byte[] value) {
- if (buffer.put(key, value) == null) {
- // ASSUMPTION: updates don't significantly change the value length
- // this lets us optimize statistics calculations
- stats.add(key.length, value.length);
- }
- }
-
- /**
- * Allows sorted iteration over the buffer contents.
- * @return the buffer entries
- */
- public Iterable<Entry<byte[], byte[]>> entries() {
- return buffer.entrySet();
- }
-
- /**
- * Returns the number of entries in the buffer.
- * @return the count
- */
- public int count() {
- return buffer.size();
- }
-
- /**
- * Returns the size of the data in bytes.
- * @return the data size
- */
- public long dataSize() {
- return stats.totalSize();
- }
-
- /**
- * Clears the buffer of all entries.
- */
- public void clear() {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Clearing buffer", this.logPrefix);
- }
-
- buffer.clear();
- stats.clear();
- metadata.clear();
-
- synchronized (this) {
- flushAction = null;
- }
- }
-
- /**
- * Returns true if the flush completion has been deferred.
- * @return true if deferred
- */
- public synchronized boolean isDeferred() {
- return flushAction != null;
- }
-
- /**
- * Defers the flush completion to a later time. This is used to ensure correct
- * ordering of soplogs during parallel flushes.
- *
- * @param action the action to perform when ready
- */
- public synchronized void defer(Runnable action) {
- assert flushAction == null;
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}Deferring flush completion", this.logPrefix);
- }
- flushAction = action;
- }
-
- /**
- * Completes the deferred flush operation.
- */
- public synchronized void complete() {
- assert flushAction != null;
-
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Completing deferred flush operation", this.logPrefix);
- }
- flushAction.run();
-
- } finally {
- flushAction = null;
- }
- }
-
- /**
- * Returns the buffer metadata.
- * @return the metadata
- */
- public synchronized EnumMap<Metadata, byte[]> getMetadata() {
- return metadata;
- }
-
- /**
- * Returns the metadata value for the given key.
- *
- * @param name the metadata name
- * @return the requested metadata
- */
- public synchronized byte[] getMetadata(Metadata name) {
- return metadata.get(name);
- }
-
- /**
- * Sets the metadata for the buffer. This is not available until the buffer
- * is about to be flushed.
- *
- * @param metadata the metadata
- */
- public synchronized void setMetadata(EnumMap<Metadata, byte[]> metadata) {
- if (metadata != null) {
- this.metadata.putAll(metadata);
- }
- }
-
- @Override
- public boolean mightContain(byte[] key) {
- return true;
- }
-
- @Override
- public ByteBuffer read(byte[] key) throws IOException {
- byte[] val = buffer.get(key);
- if (val != null) {
- return ByteBuffer.wrap(val);
- }
- return null;
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive,
- boolean ascending,
- MetadataFilter filter) {
-
- if (filter == null || filter.accept(metadata.get(filter.getName()))) {
- NavigableMap<byte[],byte[]> subset = ascending ? buffer : buffer.descendingMap();
- if (from == null && to == null) {
- // we're good
- } else if (from == null) {
- subset = subset.headMap(to, toInclusive);
- } else if (to == null) {
- subset = subset.tailMap(from, fromInclusive);
- } else {
- subset = subset.subMap(from, fromInclusive, to, toInclusive);
- }
- return new BufferIterator(subset.entrySet().iterator());
- }
- return new BufferIterator(Collections.<byte[], byte[]>emptyMap().entrySet().iterator());
- }
-
- @Override
- public SerializedComparator getComparator() {
- return (SerializedComparator) buffer.comparator();
- }
-
- @Override
- public SortedStatistics getStatistics() {
- return stats;
- }
-
- @Override
- public void close() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Closing buffer", this.logPrefix);
- }
-
- synchronized (this) {
- flushAction = null;
- }
- }
-
- /**
- * Allows sorted iteration over the buffer contents.
- */
- public static class BufferIterator
- extends AbstractKeyValueIterator<ByteBuffer, ByteBuffer>
- implements SortedIterator<ByteBuffer>
- {
- /** the backing iterator */
- private final Iterator<Entry<byte[], byte[]>> entries;
-
- /** the iteration cursor */
- private Entry<byte[], byte[]> current;
-
- public BufferIterator(Iterator<Entry<byte[], byte[]>> iterator) {
- this.entries = iterator;
- }
-
- @Override
- public ByteBuffer key() {
- return ByteBuffer.wrap(current.getKey());
- }
-
- @Override
- public ByteBuffer value() {
- return ByteBuffer.wrap(current.getValue());
- }
-
- @Override
- public void close() {
- }
-
- @Override
- protected boolean step() {
- return (current = entries.hasNext() ? entries.next() : null) != null;
- }
- }
-
- private class BufferStats implements SortedStatistics {
- /** data size */
- private long totalSize;
-
- /** key count */
- private long keys;
-
- /** avg key size */
- private double avgKeySize;
-
- /** avg value size */
- private double avgValueSize;
-
- private synchronized void clear() {
- totalSize = 0;
- keys = 0;
- avgKeySize = 0;
- avgValueSize = 0;
- }
-
- private synchronized void add(int keyLength, int valueLength) {
- totalSize += keyLength + valueLength;
- avgKeySize = (keyLength + keys * avgKeySize) / (keys + 1);
- avgValueSize = (keyLength + keys * avgValueSize) / (keys + 1);
-
- keys++;
- }
-
- @Override
- public synchronized long keyCount() {
- return keys;
- }
-
- @Override
- public byte[] firstKey() {
- return buffer.firstKey();
- }
-
- @Override
- public byte[] lastKey() {
- return buffer.lastKey();
- }
-
- @Override
- public synchronized double avgKeySize() {
- return avgKeySize;
- }
-
- @Override
- public synchronized double avgValueSize() {
- return avgValueSize;
- }
-
- @Override
- public void close() {
- }
-
- public synchronized long totalSize() {
- return totalSize;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java
deleted file mode 100644
index 95fb411..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumMap;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-
-/**
- * Defines the API for reading and writing sorted key/value pairs. The keys
- * are expected to be lexicographically comparable {@code byte[]} arrays.
- *
- * @author bakera
- */
-public interface SortedOplog {
- /**
- * Checks if a key may be present in a set.
- */
- public interface BloomFilter {
- /**
- * Returns true if the bloom filter might contain the supplied key. The
- * nature of the bloom filter is such that false positives are allowed, but
- * false negatives cannot occur.
- *
- * @param key the key to test
- * @return true if the key might be present
- */
- boolean mightContain(byte[] key);
- }
-
- /**
- * Reads key/value pairs from the sorted file.
- */
- public interface SortedOplogReader extends SortedReader<ByteBuffer> {
- /**
- * Returns the bloom filter associated with this reader.
- * @return the bloom filter
- */
- BloomFilter getBloomFilter();
-
- /**
- * Returns the metadata value for the given key.
- *
- * @param name the metadata name
- * @return the requested metadata
- * @throws IOException error reading metadata
- */
- byte[] getMetadata(Metadata name) throws IOException;
-
- /**
- * Returns the file used to persist the soplog contents.
- * @return the file
- */
- File getFile();
-
- /**
- * @return file name
- */
- String getFileName();
-
- /**
- * renames the file to the input name
- *
- * @throws IOException
- */
- void rename(String name) throws IOException;
-
- /**
- * @return the modification timestamp of the file
- * @throws IOException
- */
- long getModificationTimeStamp() throws IOException;
-
- /**
- * Deletes the sorted oplog file
- */
- public void delete() throws IOException;
-
- /**
- * Returns true if the reader is closed.
- * @return true if closed
- */
- boolean isClosed();
- }
-
- /**
- * Writes key/value pairs in a sorted manner. Each entry that is appended
- * must have a key that is greater than or equal to the previous key.
- */
- public interface SortedOplogWriter {
- /**
- * Appends another key and value. The key is expected to be greater than
- * or equal to the last key that was appended.
- *
- * @param key the key
- * @param value the value
- * @throws IOException write error
- */
- void append(ByteBuffer key, ByteBuffer value) throws IOException;
-
- /**
- * Appends another key and value. The key is expected to be greater than
- * or equal to the last key that was appended.
- *
- * @param key the key
- * @param value the value
- * @throws IOException write error
- */
- void append(byte[] key, byte[] value) throws IOException;
-
- /**
- * Closes the file, first writing optional user and system metadata.
- *
- * @param metadata the metadata to include
- * @throws IOException unable to close file
- */
- void close(EnumMap<Metadata, byte[]> metadata) throws IOException;
-
- /**
- * Invoked to close and remove the file to clean up after an error.
- * @throws IOException error closing
- */
- void closeAndDelete() throws IOException;
- }
-
- /**
- * Creates a new sorted reader.
- *
- * @return the reader
- * @throws IOException error creating reader
- */
- SortedOplogReader createReader() throws IOException;
-
- /**
- * Creates a new sorted writer.
- *
- * @return the writer
- * @throws IOException error creating writer
- */
- SortedOplogWriter createWriter() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java
deleted file mode 100644
index a470d7e..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.EnumMap;
-
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.MetadataCompactor;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-
-/**
- * Provides a means to construct a soplog.
- */
-public interface SortedOplogFactory {
- /**
- * Configures a <code>SortedOplog</code>.
- *
- * @author bakera
- */
- public class SortedOplogConfiguration {
- /** the default metadata compactor */
- public static MetadataCompactor DEFAULT_METADATA_COMPACTOR = new MetadataCompactor() {
- @Override
- public byte[] compact(byte[] metadata1, byte[] metadata2) {
- return metadata1;
- }
- };
-
- /**
- * Defines the available checksum algorithms.
- */
- public enum Checksum {
- NONE,
- CRC32
- }
-
- /**
- * Defines the available compression algorithms.
- */
- public enum Compression {
- NONE,
- }
-
- /**
- * Defines the available key encodings.
- */
- public enum KeyEncoding {
- NONE,
- }
-
- /** the soplog name */
- private final String name;
-
- /** the statistics */
- private final SortedOplogStatistics stats;
-
- private final HFileStoreStatistics storeStats;
-
- /** true if bloom filters are enabled */
- private boolean bloom;
-
- /** the soplog block size */
- private int blockSize;
-
- /** the number of bytes for each checksum */
- private int bytesPerChecksum;
-
- /** the checksum type */
- private Checksum checksum;
-
- /** the compression type */
- private Compression compression;
-
- /** the key encoding type */
- private KeyEncoding keyEncoding;
-
- /** the comparator */
- private SerializedComparator comparator;
-
- /** metadata comparers */
- private EnumMap<Metadata, MetadataCompactor> metaCompactors;
-
- private BlockCache blockCache;
-
- private boolean cacheDataBlocksOnRead;
-
- public SortedOplogConfiguration(String name) {
- this(name, null, new SortedOplogStatistics("GridDBRegionStatistics", name), new HFileStoreStatistics("GridDBStoreStatistics", name));
- }
-
- public SortedOplogConfiguration(String name, BlockCache blockCache, SortedOplogStatistics stats, HFileStoreStatistics storeStats) {
- this.name = name;
- this.stats = stats;
-
- // defaults
- bloom = true;
- blockSize = 1 << 16;
- bytesPerChecksum = 1 << 14;
- checksum = Checksum.NONE;
- compression = Compression.NONE;
- keyEncoding = KeyEncoding.NONE;
- comparator = new ByteComparator();
- this.cacheDataBlocksOnRead = true;
- this.storeStats = storeStats;
- this.blockCache = blockCache;
- }
-
- public SortedOplogConfiguration setBloomFilterEnabled(boolean enabled) {
- this.bloom = enabled;
- return this;
- }
-
- public SortedOplogConfiguration setBlockSize(int size) {
- this.blockSize = size;
- return this;
- }
-
- public SortedOplogConfiguration setBytesPerChecksum(int bytes) {
- this.bytesPerChecksum = bytes;
- return this;
- }
-
- public SortedOplogConfiguration setChecksum(Checksum type) {
- this.checksum = type;
- return this;
- }
-
- public SortedOplogConfiguration setCompression(Compression type) {
- this.compression = type;
- return this;
- }
-
- public SortedOplogConfiguration setKeyEncoding(KeyEncoding type) {
- this.keyEncoding = type;
- return this;
- }
-
- public SortedOplogConfiguration setComparator(SerializedComparator comp) {
- this.comparator = comp;
- return this;
- }
-
- public SortedOplogConfiguration addMetadataCompactor(Metadata name, MetadataCompactor compactor) {
- metaCompactors.put(name, compactor);
- return this;
- }
-
- /**
- * Returns the soplog name.
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * Returns the statistics.
- * @return the statistics
- */
- public SortedOplogStatistics getStatistics() {
- return stats;
- }
-
- public HFileStoreStatistics getStoreStatistics() {
- return storeStats;
- }
-
- /**
- * Returns true if the bloom filter is enabled.
- * @return true if enabled
- */
- public boolean isBloomFilterEnabled() {
- return bloom;
- }
-
- /**
- * Returns the block size in bytes.
- * @return the block size
- */
- public int getBlockSize() {
- return blockSize;
- }
-
- /**
- * Returns the number of bytes per checksum.
- * @return the bytes
- */
- public int getBytesPerChecksum() {
- return bytesPerChecksum;
- }
-
- /**
- * Returns the checksum type.
- * @return the checksum
- */
- public Checksum getChecksum() {
- return checksum;
- }
-
- /**
- * Returns the compression type.
- * @return the compression
- */
- public Compression getCompression() {
- return compression;
- }
-
- /**
- * Returns the key encoding type.
- * @return the key encoding
- */
- public KeyEncoding getKeyEncoding() {
- return keyEncoding;
- }
-
- /**
- * Returns the comparator.
- * @return the comparator
- */
- public SerializedComparator getComparator() {
- return comparator;
- }
-
- /**
- * Returns the metadata compactor for the given name.
- * @param name the metadata name
- * @return the compactor
- */
- public MetadataCompactor getMetadataCompactor(Metadata name) {
- MetadataCompactor mc = metaCompactors.get(name);
- if (mc != null) {
- return mc;
- }
- return DEFAULT_METADATA_COMPACTOR;
- }
-
- public BlockCache getBlockCache() {
- return this.blockCache;
- }
-
- public boolean getCacheDataBlocksOnRead() {
- return cacheDataBlocksOnRead ;
- }
- }
-
- /**
- * Returns the configuration.
- * @return the configuration
- */
- SortedOplogConfiguration getConfiguration();
-
- /**
- * Creates a new soplog.
- *
- * @param name the filename
- * @return the soplog
- * @throws IOException error creating soplog
- */
- SortedOplog createSortedOplog(File name) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java
deleted file mode 100644
index 2900229..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumMap;
-
-/**
- * Provides a unified view of the current SBuffer, the unflushed SBuffers, and
- * the existing soplogs.
- *
- * @author bakera
- */
-public interface SortedOplogSet extends SortedReader<ByteBuffer> {
- /**
- * Defines a callback handler for asynchronous operations.
- */
- public interface FlushHandler {
- /**
- * Invoked when the operation completed successfully.
- */
- void complete();
-
- /**
- * Invoked when the operation completed with an error.
- * @param t the error
- */
- void error(Throwable t);
- }
-
- /**
- * Inserts or updates an entry in the current buffer. This invocation may
- * block if the current buffer is full and there are too many outstanding
- * write requests.
- *
- * @param key the key
- * @param value the value
- * @throws IOException
- */
- void put(byte[] key, byte[] value) throws IOException;
-
- /**
- * Returns the size of the current buffer in bytes.
- * @return the buffer size
- */
- long bufferSize();
-
- /**
- * Returns the size of the unflushed buffers in bytes.
- * @return the unflushed size
- */
- long unflushedSize();
-
- /**
- * Requests that the current buffer be flushed to disk. This invocation may
- * block if there are too many outstanding write requests.
- *
- * @param metadata supplemental data to be included in the soplog
- * @param handler the flush completion callback
- * @throws IOException error preparing flush
- */
- void flush(EnumMap<Metadata, byte[]> metadata, FlushHandler handler) throws IOException;
-
- /**
- * Flushes the current buffer and closes the soplog set. Blocks until the flush
- * is completed.
- *
- * @param metadata supplemental data to be included in the soplog
- * @throws IOException error during flush
- */
- void flushAndClose(EnumMap<Metadata, byte[]> metadata) throws IOException;
-
- /**
- * Returns the configured compaction strategy.
- * @return the compactor
- */
- Compactor getCompactor();
-
- /**
- * Clears the current buffer, any existing buffers, and all active soplogs.
- *
- * @throws IOException unable to clear
- */
- void clear() throws IOException;
-
- /**
- * Clears existing and closes the soplog set.
- * @throws IOException unable to destroy
- */
- void destroy() throws IOException;
-
- /**
- * Returns true if the set is closed.
- * @return true if closed
- */
- boolean isClosed();
-
- /**
- * Returns the soplog factory.
- * @return the factory
- */
- SortedOplogFactory getFactory();
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java
deleted file mode 100644
index 2cf1191..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java
+++ /dev/null
@@ -1,780 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogWriter;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.util.AbortableTaskService;
-import com.gemstone.gemfire.internal.util.AbortableTaskService.AbortableTask;
-
-/**
- * Provides a unifies view across a set of sbuffers and soplogs. Updates are
- * made into the current sbuffer. When requested, the current sbuffer will be
- * flushed and subsequent updates will flow into a new sbuffer. All flushes are
- * done on a background thread.
- *
- * @author bakera
- */
-public class SortedOplogSetImpl extends AbstractSortedReader implements SortedOplogSet {
- private static final Logger logger = LogService.getLogger();
-
- /** creates new soplogs */
- private final SortedOplogFactory factory;
-
- /** the background flush thread pool */
- private final AbortableTaskService flusher;
-
- /** the compactor */
- private final Compactor compactor;
-
- /** the current sbuffer */
- private final AtomicReference<SortedBuffer<Integer>> current;
-
- /** the buffer count */
- private final AtomicInteger bufferCount;
-
- /** the unflushed sbuffers */
- private final Deque<SortedBuffer<Integer>> unflushed;
-
- /** the lock for access to unflushed and soplogs */
- private final ReadWriteLock rwlock;
-
- /** test hook for clear/close/destroy during flush */
- volatile CountDownLatch testDelayDuringFlush;
-
- /** test hook to cause IOException during flush */
- volatile boolean testErrorDuringFlush;
-
- private final String logPrefix;
-
- public SortedOplogSetImpl(final SortedOplogFactory factory, Executor exec, Compactor ctor) throws IOException {
- this.factory = factory;
- this.flusher = new AbortableTaskService(exec);
- this.compactor = ctor;
-
- rwlock = new ReentrantReadWriteLock();
- bufferCount = new AtomicInteger(0);
- unflushed = new ArrayDeque<SortedBuffer<Integer>>();
- current = new AtomicReference<SortedBuffer<Integer>>(
- new SortedBuffer<Integer>(factory.getConfiguration(), 0));
-
- this.logPrefix = "<" + factory.getConfiguration().getName() + "> ";
- if (logger.isDebugEnabled()) {
- logger.debug("{}Creating soplog set", this.logPrefix);
- }
- }
-
- @Override
- public boolean mightContain(byte[] key) throws IOException {
- // loops through the following readers:
- // current sbuffer
- // unflushed sbuffers
- // soplogs
- //
- // The loop has been unrolled for efficiency.
- //
- if (getCurrent().mightContain(key)) {
- return true;
- }
-
- // snapshot the sbuffers and soplogs for stable iteration
- List<SortedReader<ByteBuffer>> readers;
- Collection<TrackedReference<SortedOplogReader>> soplogs;
- rwlock.readLock().lock();
- try {
- readers = new ArrayList<SortedReader<ByteBuffer>>(unflushed);
- soplogs = compactor.getActiveReaders(key, key);
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- readers.add(tr.get());
- }
- } finally {
- rwlock.readLock().unlock();
- }
-
- try {
- for (SortedReader<ByteBuffer> rdr : readers) {
- if (rdr.mightContain(key)) {
- return true;
- }
- }
- return false;
- } finally {
- TrackedReference.decrementAll(soplogs);
- }
- }
-
- @Override
- public ByteBuffer read(byte[] key) throws IOException {
- // loops through the following readers:
- // current sbuffer
- // unflushed sbuffers
- // soplogs
- //
- // The loop has been slightly unrolled for efficiency.
- //
- ByteBuffer val = getCurrent().read(key);
- if (val != null) {
- return val;
- }
-
- // snapshot the sbuffers and soplogs for stable iteration
- List<SortedReader<ByteBuffer>> readers;
- Collection<TrackedReference<SortedOplogReader>> soplogs;
- rwlock.readLock().lock();
- try {
- readers = new ArrayList<SortedReader<ByteBuffer>>(unflushed);
- soplogs = compactor.getActiveReaders(key, key);
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- readers.add(tr.get());
- }
- } finally {
- rwlock.readLock().unlock();
- }
-
- try {
- for (SortedReader<ByteBuffer> rdr : readers) {
- if (rdr.mightContain(key)) {
- val = rdr.read(key);
- if (val != null) {
- return val;
- }
- }
- }
- return null;
- } finally {
- TrackedReference.decrementAll(soplogs);
- }
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive,
- boolean ascending,
- MetadataFilter filter) throws IOException {
-
- SerializedComparator sc = factory.getConfiguration().getComparator();
- sc = ascending ? sc : ReversingSerializedComparator.reverse(sc);
-
- List<SortedIterator<ByteBuffer>> scans = new ArrayList<SortedIterator<ByteBuffer>>();
- Collection<TrackedReference<SortedOplogReader>> soplogs;
- rwlock.readLock().lock();
- try {
- scans.add(getCurrent().scan(from, fromInclusive, to, toInclusive, ascending, filter));
- for (SortedBuffer<Integer> sb : unflushed) {
- scans.add(sb.scan(from, fromInclusive, to, toInclusive, ascending, filter));
- }
- soplogs = compactor.getActiveReaders(from, to);
- } finally {
- rwlock.readLock().unlock();
- }
-
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- scans.add(tr.get().scan(from, fromInclusive, to, toInclusive, ascending, filter));
- }
- return new MergedIterator(sc, soplogs, scans);
- }
-
- @Override
- public void put(byte[] key, byte[] value) {
- assert key != null;
- assert value != null;
-
- long start = factory.getConfiguration().getStatistics().getPut().begin();
- getCurrent().put(key, value);
- factory.getConfiguration().getStatistics().getPut().end(value.length, start);
- }
-
- @Override
- public long bufferSize() {
- return getCurrent().dataSize();
- }
-
- @Override
- public long unflushedSize() {
- long size = 0;
- rwlock.readLock().lock();
- try {
- for (SortedBuffer<Integer> sb : unflushed) {
- size += sb.dataSize();
- }
- } finally {
- rwlock.readLock().unlock();
- }
- return size;
- }
-
- @Override
- public void flushAndClose(EnumMap<Metadata, byte[]> metadata) throws IOException {
- final AtomicReference<Throwable> err = new AtomicReference<Throwable>(null);
- flush(metadata, new FlushHandler() {
- @Override public void complete() { }
- @Override public void error(Throwable t) { err.set(t); }
- });
-
- // waits for flush completion
- close();
-
- Throwable t = err.get();
- if (t != null) {
- throw new IOException(t);
- }
- }
-
- @Override
- public void flush(EnumMap<Metadata, byte[]> metadata, FlushHandler handler) {
- assert handler != null;
-
- long start = factory.getConfiguration().getStatistics().getFlush().begin();
-
- // flip to a new buffer
- final SortedBuffer<Integer> sb;
- rwlock.writeLock().lock();
- try {
- if (isClosed()) {
- handler.complete();
- factory.getConfiguration().getStatistics().getFlush().end(0, start);
-
- return;
- }
-
- sb = flipBuffer();
- if (sb.count() == 0) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Skipping flush of empty buffer {}", this.logPrefix, sb);
- }
- handler.complete();
- return;
- }
-
- sb.setMetadata(metadata);
- unflushed.addFirst(sb);
-
- // Note: this is queued while holding the lock to ensure correct ordering
- // on the executor queue. Don't use a bounded queue here or we will block
- // the flush invoker.
- flusher.execute(new FlushTask(handler, sb, start));
-
- } finally {
- rwlock.writeLock().unlock();
- }
- }
-
- @Override
- public void clear() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Clearing soplog set", this.logPrefix);
- }
-
- long start = factory.getConfiguration().getStatistics().getClear().begin();
-
- // acquire lock to ensure consistency with flushes
- rwlock.writeLock().lock();
- try {
- SortedBuffer<Integer> tmp = current.get();
- if (tmp != null) {
- tmp.clear();
- }
-
- flusher.abortAll();
- for (SortedBuffer<Integer> sb : unflushed) {
- sb.clear();
- }
-
- unflushed.clear();
- compactor.clear();
-
- releaseTestDelay();
- flusher.waitForCompletion();
- factory.getConfiguration().getStatistics().getClear().end(start);
-
- } catch (IOException e) {
- factory.getConfiguration().getStatistics().getClear().error(start);
- throw (IOException) e.fillInStackTrace();
-
- } finally {
- rwlock.writeLock().unlock();
- }
- }
-
- @Override
- public void destroy() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Destroying soplog set", this.logPrefix);
- }
-
- long start = factory.getConfiguration().getStatistics().getDestroy().begin();
- try {
- unsetCurrent();
- clear();
- close();
-
- factory.getConfiguration().getStatistics().getDestroy().end(start);
-
- } catch (IOException e) {
- factory.getConfiguration().getStatistics().getDestroy().error(start);
- throw (IOException) e.fillInStackTrace();
- }
- }
-
- @Override
- public void close() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Closing soplog set", this.logPrefix);
- }
-
- unsetCurrent();
- releaseTestDelay();
-
- flusher.waitForCompletion();
- compactor.close();
- }
-
- @Override
- public SerializedComparator getComparator() {
- return factory.getConfiguration().getComparator();
- }
-
- @Override
- public SortedStatistics getStatistics() throws IOException {
- List<SortedStatistics> stats = new ArrayList<SortedStatistics>();
- Collection<TrackedReference<SortedOplogReader>> soplogs;
-
- // snapshot, this is expensive
- rwlock.readLock().lock();
- try {
- stats.add(getCurrent().getStatistics());
- for (SortedBuffer<Integer> sb : unflushed) {
- stats.add(sb.getStatistics());
- }
- soplogs = compactor.getActiveReaders(null, null);
- } finally {
- rwlock.readLock().unlock();
- }
-
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- stats.add(tr.get().getStatistics());
- }
- return new MergedStatistics(stats, soplogs);
- }
-
- @Override
- public Compactor getCompactor() {
- return compactor;
- }
-
- @Override
- public boolean isClosed() {
- return current.get() == null;
- }
-
- @Override
- public SortedOplogFactory getFactory() {
- return factory;
- }
-
- private SortedBuffer<Integer> flipBuffer() {
- final SortedBuffer<Integer> sb;
- sb = getCurrent();
- SortedBuffer<Integer> next = new SortedBuffer<Integer>(
- factory.getConfiguration(),
- bufferCount.incrementAndGet());
-
- current.set(next);
- if (logger.isDebugEnabled()) {
- logger.debug("{}Switching from buffer {} to {}", this.logPrefix, sb, next);
- }
- return sb;
- }
-
- private SortedBuffer<Integer> getCurrent() {
- SortedBuffer<Integer> tmp = current.get();
- if (tmp == null) {
- throw new IllegalStateException("Closed");
- }
- return tmp;
- }
-
- private void unsetCurrent() {
- rwlock.writeLock().lock();
- try {
- SortedBuffer<Integer> tmp = current.getAndSet(null);
- if (tmp != null) {
- tmp.clear();
- }
- } finally {
- rwlock.writeLock().unlock();
- }
- }
-
- private void releaseTestDelay() {
- if (testDelayDuringFlush != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Releasing testDelayDuringFlush", this.logPrefix);
- }
- testDelayDuringFlush.countDown();
- }
- }
-
- private class FlushTask implements AbortableTask {
- private final FlushHandler handler;
- private final SortedBuffer<Integer> buffer;
- private final long start;
-
- public FlushTask(FlushHandler handler, SortedBuffer<Integer> buffer, long start) {
- this.handler = handler;
- this.buffer = buffer;
- this.start = start;
- }
-
- @Override
- public void runOrAbort(final AtomicBoolean aborted) {
- try {
- // First transfer the contents of the buffer to a new soplog.
- final SortedOplog soplog = writeBuffer(buffer, aborted);
-
- // If we are aborted, someone else will cleanup the unflushed queue
- if (soplog == null || !lockOrAbort(aborted)) {
- handler.complete();
- return;
- }
-
- try {
- Runnable action = new Runnable() {
- @Override
- public void run() {
- try {
- compactor.add(soplog);
- compactor.compact(false, null);
-
- unflushed.removeFirstOccurrence(buffer);
-
- // TODO need to invoke this while NOT holding write lock
- handler.complete();
- factory.getConfiguration().getStatistics().getFlush().end(buffer.dataSize(), start);
-
- } catch (Exception e) {
- handleError(e, aborted);
- return;
- }
- }
- };
-
- // Enforce flush ordering for consistency. If the previous buffer flush
- // is incomplete, we defer completion and release the thread to avoid
- // deadlocks.
- if (buffer == unflushed.peekLast()) {
- action.run();
-
- SortedBuffer<Integer> tail = unflushed.peekLast();
- while (tail != null && tail.isDeferred() && !aborted.get()) {
- // TODO need to invoke this while NOT holding write lock
- tail.complete();
- tail = unflushed.peekLast();
- }
- } else {
- buffer.defer(action);
- }
- } finally {
- rwlock.writeLock().unlock();
- }
- } catch (Exception e) {
- handleError(e, aborted);
- }
- }
-
- @Override
- public void abortBeforeRun() {
- handler.complete();
- factory.getConfiguration().getStatistics().getFlush().end(start);
- }
-
- private void handleError(Exception e, AtomicBoolean aborted) {
- if (lockOrAbort(aborted)) {
- try {
- unflushed.removeFirstOccurrence(buffer);
- } finally {
- rwlock.writeLock().unlock();
- }
- }
-
- handler.error(e);
- factory.getConfiguration().getStatistics().getFlush().error(start);
- }
-
- private SortedOplog writeBuffer(SortedBuffer<Integer> sb, AtomicBoolean aborted)
- throws IOException {
- File f = compactor.getFileset().getNextFilename();
- if (logger.isDebugEnabled()) {
- logger.debug("{}Flushing buffer {} to {}", SortedOplogSetImpl.this.logPrefix, sb, f);
- }
-
- SortedOplog so = factory.createSortedOplog(f);
- SortedOplogWriter writer = so.createWriter();
- try {
- if (testErrorDuringFlush) {
- throw new IOException("Flush error due to testErrorDuringFlush=true");
- }
-
- for (Entry<byte[], byte[]> entry : sb.entries()) {
- if (aborted.get()) {
- writer.closeAndDelete();
- return null;
- }
- writer.append(entry.getKey(), entry.getValue());
- }
-
- checkTestDelay();
-
- writer.close(buffer.getMetadata());
- return so;
-
- } catch (IOException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Encountered error while flushing buffer {}", SortedOplogSetImpl.this.logPrefix, sb, e);
- }
-
- writer.closeAndDelete();
- throw e;
- }
- }
-
- private void checkTestDelay() {
- if (testDelayDuringFlush != null) {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Waiting for testDelayDuringFlush", SortedOplogSetImpl.this.logPrefix);
- }
- testDelayDuringFlush.await();
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private boolean lockOrAbort(AtomicBoolean abort) {
- try {
- while (!abort.get()) {
- if (rwlock.writeLock().tryLock(10, TimeUnit.MILLISECONDS)) {
- return true;
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return false;
- }
- }
-
- private class MergedStatistics implements SortedStatistics {
- private final List<SortedStatistics> stats;
- private final Collection<TrackedReference<SortedOplogReader>> soplogs;
-
- public MergedStatistics(List<SortedStatistics> stats, Collection<TrackedReference<SortedOplogReader>> soplogs) {
- this.stats = stats;
- this.soplogs = soplogs;
- }
-
- @Override
- public long keyCount() {
- // TODO we have no way of determining the overall key population
- // just assume no overlap for now
- long keys = 0;
- for (SortedStatistics ss : stats) {
- keys += ss.keyCount();
- }
- return keys;
- }
-
- @Override
- public byte[] firstKey() {
- byte[] first = stats.get(0).firstKey();
- for (int i = 1; i < stats.size(); i++) {
- byte[] tmp = stats.get(i).firstKey();
- if (getComparator().compare(first, tmp) > 0) {
- first = tmp;
- }
- }
- return first;
- }
-
- @Override
- public byte[] lastKey() {
- byte[] last = stats.get(0).lastKey();
- for (int i = 1; i < stats.size(); i++) {
- byte[] tmp = stats.get(i).lastKey();
- if (getComparator().compare(last, tmp) < 0) {
- last = tmp;
- }
- }
- return last;
- }
-
- @Override
- public double avgKeySize() {
- double avg = 0;
- for (SortedStatistics ss : stats) {
- avg += ss.avgKeySize();
- }
- return avg / stats.size();
- }
-
- @Override
- public double avgValueSize() {
- double avg = 0;
- for (SortedStatistics ss : stats) {
- avg += ss.avgValueSize();
- }
- return avg / stats.size();
- }
-
- @Override
- public void close() {
- TrackedReference.decrementAll(soplogs);
- }
- }
-
- /**
- * Provides ordered iteration across a set of sorted data sets.
- */
- public static class MergedIterator
- extends AbstractKeyValueIterator<ByteBuffer, ByteBuffer>
- implements SortedIterator<ByteBuffer>
- {
- /** the comparison operator */
- private final SerializedComparator comparator;
-
- /** the reference counted soplogs */
- private final Collection<TrackedReference<SortedOplogReader>> soplogs;
-
- /** the backing iterators */
- private final List<SortedIterator<ByteBuffer>> iters;
-
- /** the current key */
- private ByteBuffer key;
-
- /** the current value */
- private ByteBuffer value;
-
- public MergedIterator(SerializedComparator comparator,
- Collection<TrackedReference<SortedOplogReader>> soplogs,
- List<SortedIterator<ByteBuffer>> iters) {
- this.comparator = comparator;
- this.soplogs = soplogs;
- this.iters = iters;
-
- // initialize iteration positions
- int i = 0;
- while (i < iters.size()) {
- i = advance(i);
- }
- }
-
- @Override
- public ByteBuffer key() {
- return key;
- }
-
- @Override
- public ByteBuffer value() {
- return value;
- }
-
- @Override
- protected boolean step() {
- if (iters.isEmpty() || readerIsClosed()) {
- return false;
- }
-
- int cursor = 0;
- key = iters.get(cursor).key();
-
- int i = 1;
- while (i < iters.size()) {
- ByteBuffer tmp = iters.get(i).key();
-
- int diff = comparator.compare(tmp.array(), tmp.arrayOffset(), tmp.remaining(),
- key.array(), key.arrayOffset(), key.remaining());
- if (diff < 0) {
- cursor = i++;
- key = tmp;
-
- } else if (diff == 0) {
- i = advance(i);
-
- } else {
- i++;
- }
- }
-
- value = iters.get(cursor).value();
- advance(cursor);
-
- return true;
- }
-
- @Override
- public void close() {
- for (SortedIterator<ByteBuffer> iter : iters) {
- iter.close();
- }
- TrackedReference.decrementAll(soplogs);
- }
-
- private int advance(int idx) {
- // either advance the cursor or remove the iterator
- if (!iters.get(idx).hasNext()) {
- iters.remove(idx).close();
- return idx;
- }
- iters.get(idx).next();
- return idx + 1;
- }
-
- private boolean readerIsClosed() {
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- if (tr.get().isClosed()) {
- return true;
- }
- }
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java
deleted file mode 100644
index eb5154c..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile;
-
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
-
-public class BlockCacheHolder {
- private BlockCache cache;
- private HFileStoreStatistics stats;
-
- public BlockCacheHolder(HFileStoreStatistics stats, BlockCache cache) {
- this.stats = stats;
- this.cache = cache;
- }
-
- public synchronized BlockCache getBlockCache() {
- return cache;
- }
-
- public synchronized HFileStoreStatistics getHFileStoreStats() {
- return stats;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java
deleted file mode 100644
index 56c6960..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java
+++ /dev/null
@@ -1,694 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.AbstractSortedReader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.DelegatingSerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.ReversingSerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedBuffer.BufferIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedStatistics;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.util.Bytes;
-import com.gemstone.gemfire.internal.util.Hex;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
-import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
-import org.apache.hadoop.hbase.util.BloomFilterFactory;
-import org.apache.hadoop.hbase.util.BloomFilterWriter;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.logging.log4j.Logger;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * Provides a soplog backed by an HFile.
- *
- * @author bakera
- */
-public class HFileSortedOplog implements SortedOplog {
- public static final byte[] MAGIC = new byte[] { 0x53, 0x4F, 0x50 };
- public static final byte[] VERSION_1 = new byte[] { 0x1 };
-
- // FileInfo is not visible
- private static final byte[] AVG_KEY_LEN = "hfile.AVG_KEY_LEN".getBytes();
- private static final byte[] AVG_VALUE_LEN = "hfile.AVG_VALUE_LEN".getBytes();
-
- /** a default bloom filter */
- private static final BloomFilter DUMMY_BLOOM = new BloomFilter() {
- @Override
- public boolean mightContain(byte[] key) {
- return true;
- }
- };
-
- static final Configuration hconf;
- private static final FileSystem fs;
-
- static {
- // Leave these HBase properties set to defaults for now
- //
- // hfile.block.cache.size (25% of heap)
- // hbase.hash.type (murmur)
- // hfile.block.index.cacheonwrite (false)
- // hfile.index.block.max.size (128k)
- // hfile.format.version (2)
- // io.storefile.bloom.block.size (128k)
- // hfile.block.bloom.cacheonwrite (false)
- // hbase.rs.cacheblocksonwrite (false)
- // hbase.offheapcache.minblocksize (64k)
- // hbase.offheapcache.percentage (0)
- hconf = new Configuration();
-
- hconf.setBoolean("hbase.metrics.showTableName", true);
- SchemaMetrics.configureGlobally(hconf);
-
- try {
- fs = FileSystem.get(hconf);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
-
- private static enum InternalMetadata {
- /** identifies the soplog as a gemfire file, required */
- GEMFIRE_MAGIC,
-
- /** identifies the soplog version, required */
- VERSION,
-
- /** identifies the statistics data */
- STATISTICS,
-
- /** identifies the names of embedded comparators */
- COMPARATORS;
-
- public byte[] bytes() {
- return ("gemfire." + name()).getBytes();
- }
- }
-
- //logger instance
- private static final Logger logger = LogService.getLogger();
- protected final String logPrefix;
-
- /** the configuration */
- private final SortedOplogConfiguration sopConfig;
-
- /** the hfile cache config */
- private final CacheConfig hcache;
-
- /** the hfile location */
- private Path path;
-
- public HFileSortedOplog(File hfile, SortedOplogConfiguration sopConfig) throws IOException {
- assert hfile != null;
- assert sopConfig != null;
-
- this.sopConfig = sopConfig;
- path = fs.makeQualified(new Path(hfile.toString()));
-
-// hcache = new CacheConfig(hconf, sopConfig.getCacheDataBlocksOnRead(), sopConfig.getBlockCache(),
-// HFileSortedOplogFactory.convertStatistics(sopConfig.getStatistics(), sopConfig.getStoreStatistics()));
- hcache = new CacheConfig(hconf);
- this.logPrefix = "<" + sopConfig.getName() + "> ";
- }
-
- @Override
- public SortedOplogReader createReader() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Creating an HFile reader on " + path, logPrefix);
- }
-
- return new HFileSortedOplogReader();
- }
-
- @Override
- public SortedOplogWriter createWriter() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Creating an HFile writer on " + path, logPrefix);
- }
-
- return new HFileSortedOplogWriter();
- }
-
- SortedOplogConfiguration getConfiguration() {
- return sopConfig;
- }
-
- private class HFileSortedOplogReader extends AbstractSortedReader implements SortedOplogReader {
- private final Reader reader;
- private final BloomFilter bloom;
- private final SortedStatistics stats;
- private volatile boolean closed;
-
- public HFileSortedOplogReader() throws IOException {
- reader = HFile.createReader(fs, path, hcache);
- validate();
-
- stats = new HFileSortedStatistics(reader);
- closed = false;
-
- if (reader.getComparator() instanceof DelegatingSerializedComparator) {
- loadComparators((DelegatingSerializedComparator) reader.getComparator());
- }
-
- DataInput bin = reader.getGeneralBloomFilterMetadata();
- if (bin != null) {
- final org.apache.hadoop.hbase.util.BloomFilter hbloom = BloomFilterFactory.createFromMeta(bin, reader);
- if (reader.getComparator() instanceof DelegatingSerializedComparator) {
- loadComparators((DelegatingSerializedComparator) hbloom.getComparator());
- }
-
- bloom = new BloomFilter() {
- @Override
- public boolean mightContain(byte[] key) {
- assert key != null;
-
- long start = sopConfig.getStatistics().getBloom().begin();
- boolean foundKey = hbloom.contains(key, 0, key.length, null);
- sopConfig.getStatistics().getBloom().end(start);
-
- if (logger.isTraceEnabled()) {
- logger.trace(String.format("{}Bloom check on %s for key %s: %b",
- path, Hex.toHex(key), foundKey), logPrefix);
- }
- return foundKey;
- }
- };
-
- } else {
- bloom = DUMMY_BLOOM;
- }
- }
-
- @Override
- public boolean mightContain(byte[] key) {
- return getBloomFilter().mightContain(key);
- }
-
- @Override
- public ByteBuffer read(byte[] key) throws IOException {
- assert key != null;
-
- if (logger.isTraceEnabled()) {
- logger.trace(String.format("{}Reading key %s from %s", Hex.toHex(key), path), logPrefix);
- }
-
- long start = sopConfig.getStatistics().getRead().begin();
- try {
- HFileScanner seek = reader.getScanner(true, true);
- if (seek.seekTo(key) == 0) {
- ByteBuffer val = seek.getValue();
- sopConfig.getStatistics().getRead().end(val.remaining(), start);
-
- return val;
- }
-
- sopConfig.getStatistics().getRead().end(start);
- sopConfig.getStatistics().getBloom().falsePositive();
- return null;
-
- } catch (IOException e) {
- sopConfig.getStatistics().getRead().error(start);
- throw (IOException) e.fillInStackTrace();
- }
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive,
- boolean ascending,
- MetadataFilter filter) throws IOException {
- if (filter == null || filter.accept(getMetadata(filter.getName()))) {
- SerializedComparator tmp = (SerializedComparator) reader.getComparator();
- tmp = ascending ? tmp : ReversingSerializedComparator.reverse(tmp);
-
-// HFileScanner scan = reader.getScanner(true, false, ascending, false);
- HFileScanner scan = reader.getScanner(true, false, false);
- return new HFileSortedIterator(scan, tmp, from, fromInclusive, to, toInclusive);
- }
- return new BufferIterator(Collections.<byte[], byte[]>emptyMap().entrySet().iterator());
- }
-
- @Override
- public SerializedComparator getComparator() {
- return (SerializedComparator) reader.getComparator();
- }
-
- @Override
- public SortedStatistics getStatistics() {
- return stats;
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
-
- @Override
- public void close() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Closing reader on " + path, logPrefix);
- }
- reader.close();
- closed = true;
- }
-
- @Override
- public BloomFilter getBloomFilter() {
- return bloom;
- }
-
- @Override
- public byte[] getMetadata(Metadata name) throws IOException {
- assert name != null;
-
- return reader.loadFileInfo().get(name.bytes());
- }
-
- @Override
- public File getFile() {
- return new File(path.toUri());
- }
-
- @Override
- public String getFileName() {
- return path.getName();
- }
-
- @Override
- public long getModificationTimeStamp() throws IOException {
- FileStatus[] stats = FSUtils.listStatus(fs, path, null);
- if (stats != null && stats.length == 1) {
- return stats[0].getModificationTime();
- } else {
- return 0;
- }
- }
-
- @Override
- public void rename(String name) throws IOException {
- Path parent = path.getParent();
- Path newPath = new Path(parent, name);
- fs.rename(path, newPath);
- // update path to point to the new path
- path = newPath;
- }
-
- @Override
- public void delete() throws IOException {
- fs.delete(path, false);
- }
-
- @Override
- public String toString() {
- return path.toString();
- }
-
- private byte[] getMetadata(InternalMetadata name) throws IOException {
- return reader.loadFileInfo().get(name.bytes());
- }
-
- private void validate() throws IOException {
- // check magic
- byte[] magic = getMetadata(InternalMetadata.GEMFIRE_MAGIC);
- if (!Arrays.equals(magic, MAGIC)) {
- throw new IOException(LocalizedStrings.Soplog_INVALID_MAGIC.toLocalizedString(Hex.toHex(magic)));
- }
-
- // check version compatibility
- byte[] ver = getMetadata(InternalMetadata.VERSION);
- if (logger.isDebugEnabled()) {
- logger.debug("{}Soplog version is " + Hex.toHex(ver), logPrefix);
- }
-
- if (!Arrays.equals(ver, VERSION_1)) {
- throw new IOException(LocalizedStrings.Soplog_UNRECOGNIZED_VERSION.toLocalizedString(Hex.toHex(ver)));
- }
- }
-
- private void loadComparators(DelegatingSerializedComparator comparator) throws IOException {
- byte[] raw = reader.loadFileInfo().get(InternalMetadata.COMPARATORS.bytes());
- assert raw != null;
-
- DataInput in = new DataInputStream(new ByteArrayInputStream(raw));
- comparator.setComparators(readComparators(in));
- }
-
- private SerializedComparator[] readComparators(DataInput in) throws IOException {
- try {
- SerializedComparator[] comps = new SerializedComparator[in.readInt()];
- assert comps.length > 0;
-
- for (int i = 0; i < comps.length; i++) {
- comps[i] = (SerializedComparator) Class.forName(in.readUTF()).newInstance();
- if (comps[i] instanceof DelegatingSerializedComparator) {
- ((DelegatingSerializedComparator) comps[i]).setComparators(readComparators(in));
- }
- }
- return comps;
-
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- }
-
- private class HFileSortedOplogWriter implements SortedOplogWriter {
- private final Writer writer;
- private final BloomFilterWriter bfw;
-
- public HFileSortedOplogWriter() throws IOException {
- writer = HFile.getWriterFactory(hconf, hcache)
- .withPath(fs, path)
- .withBlockSize(sopConfig.getBlockSize())
- .withBytesPerChecksum(sopConfig.getBytesPerChecksum())
- .withChecksumType(HFileSortedOplogFactory.convertChecksum(sopConfig.getChecksum()))
-// .withComparator(sopConfig.getComparator())
- .withCompression(HFileSortedOplogFactory.convertCompression(sopConfig.getCompression()))
- .withDataBlockEncoder(HFileSortedOplogFactory.convertEncoding(sopConfig.getKeyEncoding()))
- .create();
-
- bfw = sopConfig.isBloomFilterEnabled() ?
-// BloomFilterFactory.createGeneralBloomAtWrite(hconf, hcache, BloomType.ROW,
-// 0, writer, sopConfig.getComparator())
- BloomFilterFactory.createGeneralBloomAtWrite(hconf, hcache, BloomType.ROW,
- 0, writer)
- : null;
- }
-
- @Override
- public void append(byte[] key, byte[] value) throws IOException {
- assert key != null;
- assert value != null;
-
- if (logger.isTraceEnabled()) {
- logger.trace(String.format("{}Appending key %s to %s", Hex.toHex(key), path), logPrefix);
- }
-
- try {
- writer.append(key, value);
- if (bfw != null) {
- bfw.add(key, 0, key.length);
- }
- } catch (IOException e) {
- throw (IOException) e.fillInStackTrace();
- }
- }
-
- @Override
- public void append(ByteBuffer key, ByteBuffer value) throws IOException {
- assert key != null;
- assert value != null;
-
- if (logger.isTraceEnabled()) {
- logger.trace(String.format("{}Appending key %s to %s",
- Hex.toHex(key.array(), key.arrayOffset(), key.remaining()), path), logPrefix);
- }
-
- try {
- byte[] keyBytes = new byte[key.remaining()];
- key.duplicate().get(keyBytes);
- byte[] valueBytes = new byte[value.remaining()];
- value.duplicate().get(valueBytes);
- writer.append(keyBytes, valueBytes);
- if (bfw != null) {
- bfw.add(key.array(), key.arrayOffset(), key.remaining());
- }
- } catch (IOException e) {
- throw (IOException) e.fillInStackTrace();
- }
- }
-
- @Override
- public void close(EnumMap<Metadata, byte[]> metadata) throws IOException {
- if (logger.isTraceEnabled()) {
- logger.debug("{}Finalizing and closing writer on " + path, logPrefix);
- }
-
- if (bfw != null) {
- bfw.compactBloom();
- writer.addGeneralBloomFilter(bfw);
- }
-
- // append system metadata
- writer.appendFileInfo(InternalMetadata.GEMFIRE_MAGIC.bytes(), MAGIC);
- writer.appendFileInfo(InternalMetadata.VERSION.bytes(), VERSION_1);
-
- // append comparator info
-// if (writer.getComparator() instanceof DelegatingSerializedComparator) {
-// ByteArrayOutputStream bos = new ByteArrayOutputStream();
-// DataOutput out = new DataOutputStream(bos);
-//
-// writeComparatorInfo(out, ((DelegatingSerializedComparator) writer.getComparator()).getComparators());
-// writer.appendFileInfo(InternalMetadata.COMPARATORS.bytes(), bos.toByteArray());
-// }
-
- // TODO write statistics data to soplog
- // writer.appendFileInfo(Meta.STATISTICS.toBytes(), null);
-
- // append user metadata
- if (metadata != null) {
- for (Entry<Metadata, byte[]> entry : metadata.entrySet()) {
- writer.appendFileInfo(entry.getKey().name().getBytes(), entry.getValue());
- }
- }
-
- writer.close();
- }
-
- @Override
- public void closeAndDelete() throws IOException {
- if (logger.isTraceEnabled()) {
- logger.debug("{}Closing writer and deleting " + path, logPrefix);
- }
-
- writer.close();
- new File(writer.getPath().toUri()).delete();
- }
-
-// private void writeComparatorInfo(DataOutput out, SerializedComparator[] comparators) throws IOException {
-// out.writeInt(comparators.length);
-// for (SerializedComparator sc : comparators) {
-// out.writeUTF(sc.getClass().getName());
-// if (sc instanceof DelegatingSerializedComparator) {
-// writeComparatorInfo(out, ((DelegatingSerializedComparator) sc).getComparators());
-// }
-// }
-// }
- }
-
- private class HFileSortedIterator implements SortedIterator<ByteBuffer> {
- private final HFileScanner scan;
- private final SerializedComparator comparator;
-
- private final byte[] from;
- private final boolean fromInclusive;
-
- private final byte[] to;
- private final boolean toInclusive;
-
- private final long start;
- private long bytes;
-
- private boolean foundNext;
-
- private ByteBuffer key;
- private ByteBuffer value;
-
- public HFileSortedIterator(HFileScanner scan, SerializedComparator comparator,
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive) throws IOException {
- this.scan = scan;
- this.comparator = comparator;
- this.from = from;
- this.fromInclusive = fromInclusive;
- this.to = to;
- this.toInclusive = toInclusive;
-
- assert from == null
- || to == null
- || comparator.compare(from, 0, from.length, to, 0, to.length) <= 0;
-
- start = sopConfig.getStatistics().getScan().begin();
- foundNext = evalFrom();
- }
-
- @Override
- public ByteBuffer key() {
- return key;
- }
-
- @Override
- public ByteBuffer value() {
- return value;
- }
-
- @Override
- public boolean hasNext() {
- if (!foundNext) {
- foundNext = step();
- }
- return foundNext;
- }
-
- @Override
- public ByteBuffer next() {
- long startNext = sopConfig.getStatistics().getScan().beginIteration();
-
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- foundNext = false;
- key = scan.getKey();
- value = scan.getValue();
-
- int len = key.remaining() + value.remaining();
- bytes += len;
- sopConfig.getStatistics().getScan().endIteration(len, startNext);
-
- return key;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() {
- sopConfig.getStatistics().getScan().end(bytes, start);
- }
-
- private boolean step() {
- try {
- if (!scan.isSeeked()) {
- return false;
-
- } else if (scan.next() && evalTo()) {
- return true;
- }
- } catch (IOException e) {
- throw new HDFSIOException("Error from HDFS during iteration", e);
- }
- return false;
- }
-
- private boolean evalFrom() throws IOException {
- if (from == null) {
- return scan.seekTo() && evalTo();
-
- } else {
- int compare = scan.seekTo(from);
- if (compare < 0) {
- return scan.seekTo() && evalTo();
-
- } else if (compare == 0 && fromInclusive) {
- return true;
-
- } else {
- return step();
- }
- }
- }
-
- private boolean evalTo() throws IOException {
- int compare = -1;
- if (to != null) {
- ByteBuffer key = scan.getKey();
- compare = comparator.compare(
- key.array(), key.arrayOffset(), key.remaining(),
- to, 0, to.length);
- }
-
- return compare < 0 || (compare == 0 && toInclusive);
- }
- }
-
- private static class HFileSortedStatistics implements SortedStatistics {
- private final Reader reader;
- private final int keySize;
- private final int valueSize;
-
- public HFileSortedStatistics(Reader reader) throws IOException {
- this.reader = reader;
-
- byte[] sz = reader.loadFileInfo().get(AVG_KEY_LEN);
- keySize = Bytes.toInt(sz[0], sz[1], sz[2], sz[3]);
-
- sz = reader.loadFileInfo().get(AVG_VALUE_LEN);
- valueSize = Bytes.toInt(sz[0], sz[1], sz[2], sz[3]);
- }
-
- @Override
- public long keyCount() {
- return reader.getEntries();
- }
-
- @Override
- public byte[] firstKey() {
- return reader.getFirstKey();
- }
-
- @Override
- public byte[] lastKey() {
- return reader.getLastKey();
- }
-
- @Override
- public double avgKeySize() {
- return keySize;
- }
-
- @Override
- public double avgValueSize() {
- return valueSize;
- }
-
- @Override
- public void close() {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java
deleted file mode 100644
index 9546fd3..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
-import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
-import org.apache.hadoop.hbase.util.ChecksumType;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.Checksum;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.Compression;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.KeyEncoding;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-
-/**
- * Creates HFile soplogs.
- *
- * @author bakera
- */
-public class HFileSortedOplogFactory implements SortedOplogFactory {
- private final SortedOplogConfiguration config;
-
- public HFileSortedOplogFactory(String name, BlockCache blockCache, SortedOplogStatistics stats, HFileStoreStatistics storeStats) {
- config = new SortedOplogConfiguration(name, blockCache, stats, storeStats);
- }
-
- @Override
- public SortedOplogConfiguration getConfiguration() {
- return config;
- }
-
- @Override
- public SortedOplog createSortedOplog(File name) throws IOException {
- return new HFileSortedOplog(name, config);
- }
-
- public static ChecksumType convertChecksum(Checksum type) {
- switch (type) {
- case NONE: return ChecksumType.NULL;
-
- default:
- case CRC32: return ChecksumType.CRC32;
- }
- }
-
- public static Algorithm convertCompression(Compression type) {
- switch (type) {
- default:
- case NONE: return Algorithm.NONE;
- }
- }
-
- public static HFileDataBlockEncoder convertEncoding(KeyEncoding type) {
- switch (type) {
- default:
- case NONE: return NoOpDataBlockEncoder.INSTANCE;
- }
- }
-}