You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ma...@apache.org on 2015/11/23 21:48:52 UTC

[22/50] [abbrv] incubator-geode git commit: GEODE-544: Removes soplog code and tests

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java
deleted file mode 100644
index 73175e7..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import com.gemstone.gemfire.internal.DSCODE;
-import com.gemstone.gemfire.internal.cache.EntryBits;
-
-/**
- * Defines serialized tokens for soplogs. 
- */
-public enum SoplogToken {
-  
-  /** indicates the serialized value is a wildcard compares equal to any other key */
-  WILDCARD( DSCODE.WILDCARD ),
-  
-  /** indicates the serialized value is a tombstone of a deleted key */ 
-  TOMBSTONE( EntryBits.setTombstone((byte)0, true) ),
-
-  /** indicates the serialized value is a invalid token*/
-  INVALID( EntryBits.setInvalid((byte)0, true) ),
-
-  /** indicates the serialized tombstone has been garbage collected*/
-  REMOVED_PHASE2( EntryBits.setLocalInvalid((byte)0, true) ),
-  
-  /** indicates the value is serialized */
-  SERIALIZED( EntryBits.setSerialized((byte)0, true) );
-
-  /** the serialized form of the token */
-  private final byte val;
-  
-  private SoplogToken(byte val) {
-    this.val = val;
-  }
-  
-  @Override
-  public String toString() {
-    return super.toString()+" byte:"+val;
-  }
-
-  /**
-   * Returns the serialized form of the token.
-   * @return the byte
-   */
-  public byte toByte() {
-    return val;
-  }
-  
-  /**
-   * Returns true if either of the serialized objects is a wildcard.
-   * 
-   * @param b1 the first object
-   * @param off1 the first offset
-   * @param b2 the second object
-   * @param off2 the second object
-   * @return true if a wildcard
-   */
-  public static boolean isWildcard(byte[] b1, int off1, byte[] b2, int off2) {
-    return b1[off1] == DSCODE.WILDCARD || b2[off2] == DSCODE.WILDCARD;
-  }
-  
-  /**
-   * Returns true if the serialized object is a tombstone.
-   * 
-   * @param b the magic entry type byte
-   * @return true if a tombstone
-   */
-  public static boolean isTombstone(byte b) {
-    return EntryBits.isTombstone(b);
-  }
-
-  /**
-   * Returns true if the serialized object is an invalid token.
-   * 
-   * @param b the magic entry type byte
-   * @return true if invalid
-   */
-  public static boolean isInvalid(byte b) {
-    return EntryBits.isInvalid(b);
-  }
-
-  /**
-   * Returns true if the serialized tombstone was garbage collected
-   * 
-   * @param b the magic entry type byte
-   * @return true if RemovedPhase2
-   */
-  public static boolean isRemovedPhase2(byte b) {
-    return EntryBits.isLocalInvalid(b);
-  }
-
-  /**
-   * Returns true if the serialized object is not any token
-   * 
-   *@param b the magic entry type byte
-   * @return true if not any token
-   */
-  public static boolean isSerialized(byte b) {
-    return EntryBits.isSerialized(b);
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java
deleted file mode 100644
index b301ac5..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * Provides an in-memory buffer to temporarily hold key/value pairs until they
- * can be flushed to disk.  Each buffer instance can be optionally associated
- * with a user-specified tag for identification purposes.
- * 
- * @param <T> the tag type
- * @author bakera
- */
-public class SortedBuffer<T> extends AbstractSortedReader {
-  private static final Logger logger = LogService.getLogger();
-  
-  /** the tag */
-  private final T tag;
-  
-  /** in-memory sorted key/vaue buffer */
-  private final NavigableMap<byte[], byte[]> buffer;
-
-  /** the stats */
-  private final BufferStats stats;
-  
-  /** the metadata, set during flush */
-  private final EnumMap<Metadata, byte[]> metadata;
-  
-  /** the command to run (or defer) when the flush is complete */
-  private Runnable flushAction;
-  
-  private final String logPrefix;
-  
-  public SortedBuffer(SortedOplogConfiguration config, T tag) {
-    assert config != null;
-    assert tag != null;
-    
-    this.tag = tag;
-    
-    buffer = new ConcurrentSkipListMap<byte[], byte[]>(config.getComparator());
-    stats = new BufferStats();
-    metadata = new EnumMap<Metadata, byte[]>(Metadata.class);
-    
-    this.logPrefix = "<" + config.getName() + "#" + tag + "> ";
-  }
-  
-  /**
-   * Returns the tag associated with the buffer.
-   * @return the tag
-   */
-  public T getTag() {
-    return tag;
-  }
-  
-  @Override
-  public String toString() {
-    return logger.getName() + this.logPrefix;
-  }
-  
-  /**
-   * Adds a new value to the buffer.
-   * @param key the key
-   * @param value the value
-   */
-  public void put(byte[] key, byte[] value) {
-    if (buffer.put(key, value) == null) {
-      // ASSUMPTION: updates don't significantly change the value length
-      // this lets us optimize statistics calculations
-      stats.add(key.length, value.length);
-    }
-  }
-  
-  /**
-   * Allows sorted iteration over the buffer contents.
-   * @return the buffer entries
-   */
-  public Iterable<Entry<byte[], byte[]>> entries() {
-    return buffer.entrySet();
-  }
-  
-  /**
-   * Returns the number of entries in the buffer.
-   * @return the count
-   */
-  public int count() {
-    return buffer.size();
-  }
-  
-  /**
-   * Returns the size of the data in bytes.
-   * @return the data size
-   */
-  public long dataSize() {
-    return stats.totalSize();
-  }
-  
-  /**
-   * Clears the buffer of all entries.
-   */
-  public void clear() {
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Clearing buffer", this.logPrefix);
-    }
-    
-    buffer.clear();
-    stats.clear();
-    metadata.clear();
-    
-    synchronized (this) {
-      flushAction = null;
-    }
-  }
-  
-  /**
-   * Returns true if the flush completion has been deferred.
-   * @return true if deferred
-   */
-  public synchronized boolean isDeferred() {
-    return flushAction != null;
-  }
-  
-  /**
-   * Defers the flush completion to a later time.  This is used to ensure correct
-   * ordering of soplogs during parallel flushes.
-   * 
-   * @param action the action to perform when ready
-   */
-  public synchronized void defer(Runnable action) {
-    assert flushAction == null;
-    
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Deferring flush completion", this.logPrefix);
-    }
-    flushAction = action;
-  }
-  
-  /**
-   * Completes the deferred flush operation.
-   */
-  public synchronized void complete() {
-    assert flushAction != null;
-
-    try {
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}Completing deferred flush operation", this.logPrefix);
-      }
-      flushAction.run();
-      
-    } finally {
-      flushAction = null;
-    }
-  }
-  
-  /**
-   * Returns the buffer metadata.
-   * @return the metadata
-   */
-  public synchronized EnumMap<Metadata, byte[]> getMetadata() {
-    return metadata;
-  }
-  
-  /**
-   * Returns the metadata value for the given key.
-   * 
-   * @param name the metadata name
-   * @return the requested metadata
-   */
-  public synchronized byte[] getMetadata(Metadata name) {
-    return metadata.get(name);
-  }
-  
-  /**
-   * Sets the metadata for the buffer.  This is not available until the buffer
-   * is about to be flushed.
-   * 
-   * @param metadata the metadata
-   */
-  public synchronized void setMetadata(EnumMap<Metadata, byte[]> metadata) {
-    if (metadata != null) {
-      this.metadata.putAll(metadata);
-    }
-  }
-  
-  @Override
-  public boolean mightContain(byte[] key) {
-    return true;
-  }
-
-  @Override
-  public ByteBuffer read(byte[] key) throws IOException {
-    byte[] val = buffer.get(key);
-    if (val != null) {
-      return ByteBuffer.wrap(val);
-    }
-    return null;
-  }
-
-  @Override
-  public SortedIterator<ByteBuffer> scan(
-      byte[] from, boolean fromInclusive, 
-      byte[] to, boolean toInclusive,
-      boolean ascending,
-      MetadataFilter filter) {
-
-    if (filter == null || filter.accept(metadata.get(filter.getName()))) {
-      NavigableMap<byte[],byte[]> subset = ascending ? buffer : buffer.descendingMap();
-      if (from == null && to == null) {
-        // we're good
-      } else if (from == null) {
-        subset = subset.headMap(to, toInclusive);
-      } else if (to == null) {
-        subset = subset.tailMap(from, fromInclusive);
-      } else {
-        subset = subset.subMap(from, fromInclusive, to, toInclusive);
-      }
-      return new BufferIterator(subset.entrySet().iterator());
-    }
-    return new BufferIterator(Collections.<byte[], byte[]>emptyMap().entrySet().iterator());
-  }
-
-  @Override
-  public SerializedComparator getComparator() {
-    return (SerializedComparator) buffer.comparator();
-  }
-
-  @Override
-  public SortedStatistics getStatistics() {
-    return stats;
-  }
-  
-  @Override
-  public void close() throws IOException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Closing buffer", this.logPrefix);
-    }
-    
-    synchronized (this) {
-      flushAction = null;
-    }
-  }
-  
-  /**
-   * Allows sorted iteration over the buffer contents.
-   */
-  public static class BufferIterator 
-    extends AbstractKeyValueIterator<ByteBuffer, ByteBuffer>
-    implements SortedIterator<ByteBuffer>
-  {
-    /** the backing iterator */
-    private final Iterator<Entry<byte[], byte[]>> entries;
-    
-    /** the iteration cursor */
-    private Entry<byte[], byte[]> current;
-    
-    public BufferIterator(Iterator<Entry<byte[], byte[]>> iterator) {
-      this.entries = iterator;
-    }
-    
-    @Override
-    public ByteBuffer key() {
-      return ByteBuffer.wrap(current.getKey());
-    }
-
-    @Override
-    public ByteBuffer value() {
-      return ByteBuffer.wrap(current.getValue());
-    }
-
-    @Override
-    public void close() {
-    }
-
-    @Override
-    protected boolean step() {
-      return (current = entries.hasNext() ? entries.next() : null) != null;
-    }
-  }
-  
-  private class BufferStats implements SortedStatistics {
-    /** data size */
-    private long totalSize;
-    
-    /** key count */
-    private long keys;
-    
-    /** avg key size */
-    private double avgKeySize;
-    
-    /** avg value size */
-    private double avgValueSize;
-    
-    private synchronized void clear() {
-      totalSize = 0;
-      keys = 0;
-      avgKeySize = 0;
-      avgValueSize = 0;
-    }
-    
-    private synchronized void add(int keyLength, int valueLength) {
-      totalSize += keyLength + valueLength;
-      avgKeySize = (keyLength + keys * avgKeySize) / (keys + 1);
-      avgValueSize = (keyLength + keys * avgValueSize) / (keys + 1);
-      
-      keys++;
-    }
-    
-    @Override
-    public synchronized long keyCount() {
-      return keys;
-    }
-
-    @Override
-    public byte[] firstKey() {
-      return buffer.firstKey();
-    }
-
-    @Override
-    public byte[] lastKey() {
-      return buffer.lastKey();
-    }
-
-    @Override
-    public synchronized double avgKeySize() {
-      return avgKeySize;
-    }
-    
-    @Override
-    public synchronized double avgValueSize() {
-      return avgValueSize;
-    }
-    
-    @Override
-    public void close() {
-    }
-
-    public synchronized long totalSize() {
-      return totalSize;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java
deleted file mode 100644
index 95fb411..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumMap;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-
-/**
- * Defines the API for reading and writing sorted key/value pairs.  The keys
- * are expected to be lexicographically comparable {@code byte[]} arrays.
- * 
- * @author bakera
- */
-public interface SortedOplog {
-  /**
-   * Checks if a key may be present in a set.
-   */
-  public interface BloomFilter {
-    /**
-     * Returns true if the bloom filter might contain the supplied key.  The 
-     * nature of the bloom filter is such that false positives are allowed, but
-     * false negatives cannot occur.
-     * 
-     * @param key the key to test
-     * @return true if the key might be present
-     */
-    boolean mightContain(byte[] key);
-  }
-  
-  /**
-   * Reads key/value pairs from the sorted file.
-   */
-  public interface SortedOplogReader extends SortedReader<ByteBuffer> {
-    /**
-     * Returns the bloom filter associated with this reader.
-     * @return the bloom filter
-     */
-    BloomFilter getBloomFilter();
-    
-    /**
-     * Returns the metadata value for the given key.
-     * 
-     * @param name the metadata name
-     * @return the requested metadata
-     * @throws IOException error reading metadata
-     */
-    byte[] getMetadata(Metadata name) throws IOException;
-    
-    /**
-     * Returns the file used to persist the soplog contents.
-     * @return the file
-     */
-    File getFile();
-    
-    /**
-     * @return file name
-     */
-    String getFileName();
-    
-    /**
-     * renames the file to the input name
-     * 
-     * @throws IOException
-     */
-    void rename(String name) throws IOException;
-    
-    /**
-     * @return the modification timestamp of the file
-     * @throws IOException 
-     */
-    long getModificationTimeStamp() throws IOException;
-    
-    /**
-     * Deletes the sorted oplog file
-     */
-    public void delete() throws IOException;
-
-    /**
-     * Returns true if the reader is closed.
-     * @return true if closed
-     */
-    boolean isClosed();
-  }
-  
-  /**
-   * Writes key/value pairs in a sorted manner.  Each entry that is appended
-   * must have a key that is greater than or equal to the previous key.
-   */
-  public interface SortedOplogWriter {
-    /**
-     * Appends another key and value.  The key is expected to be greater than
-     * or equal to the last key that was appended.
-     * 
-     * @param key the key
-     * @param value the value
-     * @throws IOException write error
-     */
-    void append(ByteBuffer key, ByteBuffer value) throws IOException;
-
-    /**
-     * Appends another key and value.  The key is expected to be greater than
-     * or equal to the last key that was appended.
-     * 
-     * @param key the key
-     * @param value the value
-     * @throws IOException write error
-     */
-    void append(byte[] key, byte[] value) throws IOException;
-
-    /**
-     * Closes the file, first writing optional user and system metadata. 
-     * 
-     * @param metadata the metadata to include
-     * @throws IOException unable to close file
-     */
-    void close(EnumMap<Metadata, byte[]> metadata) throws IOException;
-    
-    /**
-     * Invoked to close and remove the file to clean up after an error.
-     * @throws IOException error closing
-     */
-    void closeAndDelete() throws IOException;
-  }
-  
-  /**
-   * Creates a new sorted reader.
-   * 
-   * @return the reader
-   * @throws IOException error creating reader
-   */
-  SortedOplogReader createReader() throws IOException;
-  
-  /**
-   * Creates a new sorted writer.
-   * 
-   * @return the writer
-   * @throws IOException error creating writer
-   */
-  SortedOplogWriter createWriter() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java
deleted file mode 100644
index a470d7e..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.EnumMap;
-
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.MetadataCompactor;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-
-/**
- * Provides a means to construct a soplog.
- */
-public interface SortedOplogFactory {
-  /**
-   * Configures a <code>SortedOplog</code>.
-   * 
-   * @author bakera
-   */
-  public class SortedOplogConfiguration {
-    /** the default metadata compactor */
-    public static MetadataCompactor DEFAULT_METADATA_COMPACTOR = new MetadataCompactor() {
-      @Override
-      public byte[] compact(byte[] metadata1, byte[] metadata2) {
-        return metadata1;
-      }
-    };
-    
-    /**
-     * Defines the available checksum algorithms.
-     */
-    public enum Checksum {
-      NONE,
-      CRC32
-    }
-    
-    /**
-     * Defines the available compression algorithms.
-     */
-    public enum Compression { 
-      NONE, 
-    }
-    
-    /**
-     * Defines the available key encodings.
-     */
-    public enum KeyEncoding { 
-      NONE, 
-    }
-
-    /** the soplog name */
-    private final String name;
-    
-    /** the statistics */
-    private final SortedOplogStatistics stats;
-    
-    private final HFileStoreStatistics storeStats;
-    
-    /** true if bloom filters are enabled */
-    private boolean bloom;
-    
-    /** the soplog block size */
-    private int blockSize;
-    
-    /** the number of bytes for each checksum */
-    private int bytesPerChecksum;
-    
-    /** the checksum type */
-    private Checksum checksum;
-    
-    /** the compression type */
-    private Compression compression;
-    
-    /** the key encoding type */
-    private KeyEncoding keyEncoding;
-    
-    /** the comparator */
-    private SerializedComparator comparator;
-
-    /** metadata comparers */
-    private EnumMap<Metadata, MetadataCompactor> metaCompactors;
-
-    private BlockCache blockCache;
-
-    private boolean cacheDataBlocksOnRead;
-    
-    public SortedOplogConfiguration(String name) {
-      this(name, null, new SortedOplogStatistics("GridDBRegionStatistics", name), new HFileStoreStatistics("GridDBStoreStatistics", name));
-    }
-    
-    public SortedOplogConfiguration(String name, BlockCache blockCache, SortedOplogStatistics stats, HFileStoreStatistics storeStats) {
-      this.name = name;
-      this.stats = stats;
-      
-      // defaults
-      bloom = true;
-      blockSize = 1 << 16;
-      bytesPerChecksum = 1 << 14;
-      checksum = Checksum.NONE;
-      compression = Compression.NONE;
-      keyEncoding = KeyEncoding.NONE;
-      comparator = new ByteComparator();
-      this.cacheDataBlocksOnRead = true;
-      this.storeStats = storeStats;
-      this.blockCache = blockCache;
-    }
-    
-    public SortedOplogConfiguration setBloomFilterEnabled(boolean enabled) {
-      this.bloom = enabled;
-      return this;
-    }
-    
-    public SortedOplogConfiguration setBlockSize(int size) {
-      this.blockSize = size;
-      return this;
-    }
-    
-    public SortedOplogConfiguration setBytesPerChecksum(int bytes) {
-      this.bytesPerChecksum = bytes;
-      return this;
-    }
-    
-    public SortedOplogConfiguration setChecksum(Checksum type) {
-      this.checksum = type;
-      return this;
-    }
-    
-    public SortedOplogConfiguration setCompression(Compression type) {
-      this.compression = type;
-      return this;
-    }
-    
-    public SortedOplogConfiguration setKeyEncoding(KeyEncoding type) {
-      this.keyEncoding = type;
-      return this;
-    }
-    
-    public SortedOplogConfiguration setComparator(SerializedComparator comp) {
-      this.comparator = comp;
-      return this;
-    }
-    
-    public SortedOplogConfiguration addMetadataCompactor(Metadata name, MetadataCompactor compactor) {
-      metaCompactors.put(name, compactor);
-      return this;
-    }
-    
-    /**
-     * Returns the soplog name.
-     * @return the name
-     */
-    public String getName() {
-      return name;
-    }
-
-    /**
-     * Returns the statistics.
-     * @return the statistics
-     */
-    public SortedOplogStatistics getStatistics() {
-      return stats;
-    }
-    
-    public HFileStoreStatistics getStoreStatistics() {
-      return storeStats;
-    }
-    
-    /**
-     * Returns true if the bloom filter is enabled.
-     * @return true if enabled
-     */
-    public boolean isBloomFilterEnabled() {
-      return bloom;
-    }
-
-    /**
-     * Returns the block size in bytes.
-     * @return the block size
-     */
-    public int getBlockSize() {
-      return blockSize;
-    }
-
-    /**
-     * Returns the number of bytes per checksum.
-     * @return the bytes
-     */
-    public int getBytesPerChecksum() {
-      return bytesPerChecksum;
-    }
-
-    /**
-     * Returns the checksum type.
-     * @return the checksum
-     */
-    public Checksum getChecksum() {
-      return checksum;
-    }
-
-    /**
-     * Returns the compression type.
-     * @return the compression
-     */
-    public Compression getCompression() {
-      return compression;
-    }
-
-    /**
-     * Returns the key encoding type.
-     * @return the key encoding
-     */
-    public KeyEncoding getKeyEncoding() {
-      return keyEncoding;
-    }
-
-    /**
-     * Returns the comparator.
-     * @return the comparator
-     */
-    public SerializedComparator getComparator() {
-      return comparator;
-    }
-    
-    /**
-     * Returns the metadata compactor for the given name. 
-     * @param name the metadata name
-     * @return the compactor
-     */
-    public MetadataCompactor getMetadataCompactor(Metadata name) {
-      MetadataCompactor mc = metaCompactors.get(name);
-      if (mc != null) {
-        return mc;
-      }
-      return DEFAULT_METADATA_COMPACTOR;
-    }
-
-    public BlockCache getBlockCache() {
-      return this.blockCache;
-    }
-
-    public boolean getCacheDataBlocksOnRead() {
-      return cacheDataBlocksOnRead ;
-    }
-  }
-  
-  /**
-   * Returns the configuration.
-   * @return the configuration
-   */
-  SortedOplogConfiguration getConfiguration();
-  
-  /**
-   * Creates a new soplog.
-   * 
-   * @param name the filename
-   * @return the soplog
-   * @throws IOException error creating soplog
-   */
-  SortedOplog createSortedOplog(File name) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java
deleted file mode 100644
index 2900229..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumMap;
-
-/**
- * Provides a unified view of the current SBuffer, the unflushed SBuffers, and
- * the existing soplogs.
- * 
- * @author bakera
- */
-public interface SortedOplogSet extends SortedReader<ByteBuffer> {
-  /**
-   * Defines a callback handler for asynchronous operations.
-   */
-  public interface FlushHandler {
-    /**
-     * Invoked when the operation completed successfully.
-     */
-    void complete();
-    
-    /**
-     * Invoked when the operation completed with an error.
-     * @param t the error
-     */
-    void error(Throwable t);
-  }
-
-  /**
-   * Inserts or updates an entry in the current buffer.  This invocation may
-   * block if the current buffer is full and there are too many outstanding
-   * write requests.
-   * 
-   * @param key the key
-   * @param value the value
-   * @throws IOException 
-   */
-  void put(byte[] key, byte[] value) throws IOException;
-  
-  /**
-   * Returns the size of the current buffer in bytes.
-   * @return the buffer size
-   */
-  long bufferSize();
-  
-  /**
-   * Returns the size of the unflushed buffers in bytes.
-   * @return the unflushed size
-   */
-  long unflushedSize();
-  
-  /**
-   * Requests that the current buffer be flushed to disk.  This invocation may
-   * block if there are too many outstanding write requests.
-   * 
-   * @param metadata supplemental data to be included in the soplog
-   * @param handler the flush completion callback
-   * @throws IOException error preparing flush
-   */
-  void flush(EnumMap<Metadata, byte[]> metadata, FlushHandler handler) throws IOException;
-
-  /**
-   * Flushes the current buffer and closes the soplog set.  Blocks until the flush
-   * is completed.
-   * 
-   * @param metadata supplemental data to be included in the soplog
-   * @throws IOException error during flush
-   */
-  void flushAndClose(EnumMap<Metadata, byte[]> metadata) throws IOException;
-
-  /**
-   * Returns the configured compaction strategy.
-   * @return the compactor
-   */
-  Compactor getCompactor();
-
-  /**
-   * Clears the current buffer, any existing buffers, and all active soplogs.
-   * 
-   * @throws IOException unable to clear
-   */
-  void clear() throws IOException;
-  
-  /**
-   * Clears existing and closes the soplog set.
-   * @throws IOException unable to destroy
-   */
-  void destroy() throws IOException;
-  
-  /**
-   * Returns true if the set is closed.
-   * @return true if closed
-   */
-  boolean isClosed();
-
-  /**
-   * Returns the soplog factory.
-   * @return the factory
-   */
-  SortedOplogFactory getFactory();
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java
deleted file mode 100644
index 2cf1191..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java
+++ /dev/null
@@ -1,780 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogWriter;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.util.AbortableTaskService;
-import com.gemstone.gemfire.internal.util.AbortableTaskService.AbortableTask;
-
-/**
- * Provides a unifies view across a set of sbuffers and soplogs.  Updates are 
- * made into the current sbuffer.  When requested, the current sbuffer will be
- * flushed and subsequent updates will flow into a new sbuffer.  All flushes are
- * done on a background thread.
- * 
- * @author bakera
- */
-public class SortedOplogSetImpl extends AbstractSortedReader implements SortedOplogSet {
-  private static final Logger logger = LogService.getLogger();
-  
-  /** creates new soplogs */
-  private final SortedOplogFactory factory;
-  
-  /** the background flush thread pool */
-  private final AbortableTaskService flusher;
-  
-  /** the compactor */
-  private final Compactor compactor;
-  
-  /** the current sbuffer */
-  private final AtomicReference<SortedBuffer<Integer>> current;
-  
-  /** the buffer count */
-  private final AtomicInteger bufferCount;
-  
-  /** the unflushed sbuffers */
-  private final Deque<SortedBuffer<Integer>> unflushed;
-  
-  /** the lock for access to unflushed and soplogs */
-  private final ReadWriteLock rwlock;
-  
-  /** test hook for clear/close/destroy during flush */
-  volatile CountDownLatch testDelayDuringFlush;
-  
-  /** test hook to cause IOException during flush */
-  volatile boolean testErrorDuringFlush;
-  
-  private final String logPrefix;
-  
-  public SortedOplogSetImpl(final SortedOplogFactory factory, Executor exec, Compactor ctor) throws IOException {
-    this.factory = factory;
-    this.flusher = new AbortableTaskService(exec);
-    this.compactor = ctor;
-    
-    rwlock = new ReentrantReadWriteLock();
-    bufferCount = new AtomicInteger(0);
-    unflushed = new ArrayDeque<SortedBuffer<Integer>>();
-    current = new AtomicReference<SortedBuffer<Integer>>(
-        new SortedBuffer<Integer>(factory.getConfiguration(), 0));
-    
-    this.logPrefix = "<" + factory.getConfiguration().getName() + "> ";
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Creating soplog set", this.logPrefix);
-    }
-  }
-  
-  @Override
-  public boolean mightContain(byte[] key) throws IOException {
-    // loops through the following readers:
-    //   current sbuffer
-    //   unflushed sbuffers
-    //   soplogs
-    //
-    // The loop has been unrolled for efficiency.
-    //
-    if (getCurrent().mightContain(key)) {
-      return true;
-    }
-
-    // snapshot the sbuffers and soplogs for stable iteration
-    List<SortedReader<ByteBuffer>> readers;
-    Collection<TrackedReference<SortedOplogReader>> soplogs;
-    rwlock.readLock().lock();
-    try {
-      readers = new ArrayList<SortedReader<ByteBuffer>>(unflushed);
-      soplogs = compactor.getActiveReaders(key, key);
-      for (TrackedReference<SortedOplogReader> tr : soplogs) {
-        readers.add(tr.get());
-      }
-    } finally {
-      rwlock.readLock().unlock();
-    }
-
-    try {
-      for (SortedReader<ByteBuffer> rdr : readers) {
-        if (rdr.mightContain(key)) {
-          return true;
-        }
-      }
-      return false;
-    } finally {
-      TrackedReference.decrementAll(soplogs);
-    }
-  }
-
-  @Override
-  public ByteBuffer read(byte[] key) throws IOException {
-    // loops through the following readers:
-    //   current sbuffer
-    //   unflushed sbuffers
-    //   soplogs
-    //
-    // The loop has been slightly unrolled for efficiency.
-    //
-    ByteBuffer val = getCurrent().read(key);
-    if (val != null) {
-      return val;
-    }
-
-    // snapshot the sbuffers and soplogs for stable iteration
-    List<SortedReader<ByteBuffer>> readers;
-    Collection<TrackedReference<SortedOplogReader>> soplogs;
-    rwlock.readLock().lock();
-    try {
-      readers = new ArrayList<SortedReader<ByteBuffer>>(unflushed);
-      soplogs = compactor.getActiveReaders(key, key);
-      for (TrackedReference<SortedOplogReader> tr : soplogs) {
-        readers.add(tr.get());
-      }
-    } finally {
-      rwlock.readLock().unlock();
-    }
-    
-    try {
-      for (SortedReader<ByteBuffer> rdr : readers) {
-        if (rdr.mightContain(key)) {
-          val = rdr.read(key);
-          if (val != null) {
-            return val;
-          }
-        }
-      }
-      return null;
-    } finally {
-      TrackedReference.decrementAll(soplogs);
-    }
-  }
-
-  @Override
-  public SortedIterator<ByteBuffer> scan(
-      byte[] from, boolean fromInclusive, 
-      byte[] to, boolean toInclusive,
-      boolean ascending,
-      MetadataFilter filter) throws IOException {
-
-    SerializedComparator sc = factory.getConfiguration().getComparator();
-    sc = ascending ? sc : ReversingSerializedComparator.reverse(sc);
-
-    List<SortedIterator<ByteBuffer>> scans = new ArrayList<SortedIterator<ByteBuffer>>();
-    Collection<TrackedReference<SortedOplogReader>> soplogs;
-    rwlock.readLock().lock();
-    try {
-      scans.add(getCurrent().scan(from, fromInclusive, to, toInclusive, ascending, filter));
-      for (SortedBuffer<Integer> sb : unflushed) {
-        scans.add(sb.scan(from, fromInclusive, to, toInclusive, ascending, filter));
-      }
-      soplogs = compactor.getActiveReaders(from, to);
-    } finally {
-      rwlock.readLock().unlock();
-    }
-
-    for (TrackedReference<SortedOplogReader> tr : soplogs) {
-      scans.add(tr.get().scan(from, fromInclusive, to, toInclusive, ascending, filter));
-    }
-    return new MergedIterator(sc, soplogs, scans);
-  }
-
-  @Override
-  public void put(byte[] key, byte[] value) {
-    assert key != null;
-    assert value != null;
-    
-    long start = factory.getConfiguration().getStatistics().getPut().begin();
-    getCurrent().put(key, value);
-    factory.getConfiguration().getStatistics().getPut().end(value.length, start);
-  }
-
-  @Override
-  public long bufferSize() {
-    return getCurrent().dataSize();
-  }
-
-  @Override
-  public long unflushedSize() {
-    long size = 0;
-    rwlock.readLock().lock();
-    try {
-      for (SortedBuffer<Integer> sb : unflushed) {
-        size += sb.dataSize();
-      }
-    } finally {
-      rwlock.readLock().unlock();
-    }
-    return size;
-  }
-  
-  @Override
-  public void flushAndClose(EnumMap<Metadata, byte[]> metadata) throws IOException {
-    final AtomicReference<Throwable> err = new AtomicReference<Throwable>(null);
-    flush(metadata, new FlushHandler() {
-      @Override public void complete() { }
-      @Override public void error(Throwable t) { err.set(t); }
-    });
-    
-    // waits for flush completion
-    close();
-    
-    Throwable t = err.get();
-    if (t != null) {
-      throw new IOException(t);
-    }
-  }
-  
-  @Override
-  public void flush(EnumMap<Metadata, byte[]> metadata, FlushHandler handler) {
-    assert handler != null;
-    
-    long start = factory.getConfiguration().getStatistics().getFlush().begin();
-    
-    // flip to a new buffer
-    final SortedBuffer<Integer> sb;
-    rwlock.writeLock().lock();
-    try {
-      if (isClosed()) {
-        handler.complete();
-        factory.getConfiguration().getStatistics().getFlush().end(0, start);
-        
-        return;
-      }
-      
-      sb = flipBuffer();
-      if (sb.count() == 0) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("{}Skipping flush of empty buffer {}", this.logPrefix, sb);
-        }
-        handler.complete();
-        return;
-      }
-      
-      sb.setMetadata(metadata);
-      unflushed.addFirst(sb);
-    
-      // Note: this is queued while holding the lock to ensure correct ordering
-      // on the executor queue.  Don't use a bounded queue here or we will block
-      // the flush invoker.
-      flusher.execute(new FlushTask(handler, sb, start));
-      
-    } finally {
-      rwlock.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public void clear() throws IOException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Clearing soplog set", this.logPrefix);
-    }
-
-    long start = factory.getConfiguration().getStatistics().getClear().begin();
-
-    // acquire lock to ensure consistency with flushes
-    rwlock.writeLock().lock();
-    try {
-      SortedBuffer<Integer> tmp = current.get();
-      if (tmp != null) {
-        tmp.clear();
-      }
-
-      flusher.abortAll();
-      for (SortedBuffer<Integer> sb : unflushed) {
-        sb.clear();
-      }
-      
-      unflushed.clear();
-      compactor.clear();
-
-      releaseTestDelay();
-      flusher.waitForCompletion();
-      factory.getConfiguration().getStatistics().getClear().end(start);
-      
-    } catch (IOException e) {
-      factory.getConfiguration().getStatistics().getClear().error(start);
-      throw (IOException) e.fillInStackTrace();
-      
-    } finally {
-      rwlock.writeLock().unlock();
-    }
-  }
-  
-  @Override
-  public void destroy() throws IOException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Destroying soplog set", this.logPrefix);
-    }
-
-    long start = factory.getConfiguration().getStatistics().getDestroy().begin();
-    try {
-      unsetCurrent();
-      clear();
-      close();
-      
-      factory.getConfiguration().getStatistics().getDestroy().end(start);
-      
-    } catch (IOException e) {
-      factory.getConfiguration().getStatistics().getDestroy().error(start);
-      throw (IOException) e.fillInStackTrace();
-    }
-  }
-  
-  @Override
-  public void close() throws IOException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Closing soplog set", this.logPrefix);
-    }
-
-    unsetCurrent();
-    releaseTestDelay();
-
-    flusher.waitForCompletion();
-    compactor.close();
-  }
-
-  @Override
-  public SerializedComparator getComparator() {
-    return factory.getConfiguration().getComparator();
-  }
-
-  @Override
-  public SortedStatistics getStatistics() throws IOException {
-    List<SortedStatistics> stats = new ArrayList<SortedStatistics>();
-    Collection<TrackedReference<SortedOplogReader>> soplogs;
-    
-    // snapshot, this is expensive
-    rwlock.readLock().lock();
-    try {
-      stats.add(getCurrent().getStatistics());
-      for (SortedBuffer<Integer> sb : unflushed) {
-        stats.add(sb.getStatistics());
-      }
-      soplogs = compactor.getActiveReaders(null, null);
-    } finally {
-      rwlock.readLock().unlock();
-    }
-    
-    for (TrackedReference<SortedOplogReader> tr : soplogs) {
-      stats.add(tr.get().getStatistics());
-    }
-    return new MergedStatistics(stats, soplogs);
-  }
-
-  @Override
-  public Compactor getCompactor() {
-    return compactor;
-  }
-  
-  @Override
-  public boolean isClosed() {
-    return current.get() == null;
-  }
-  
-  @Override
-  public SortedOplogFactory getFactory() {
-    return factory;
-  }
-  
-  private SortedBuffer<Integer> flipBuffer() {
-    final SortedBuffer<Integer> sb;
-    sb = getCurrent();
-    SortedBuffer<Integer> next = new SortedBuffer<Integer>(
-        factory.getConfiguration(), 
-        bufferCount.incrementAndGet());
-  
-    current.set(next);
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Switching from buffer {} to {}", this.logPrefix, sb, next);
-    }
-    return sb;
-  }
-
-  private SortedBuffer<Integer> getCurrent() {
-    SortedBuffer<Integer> tmp = current.get();
-    if (tmp == null) {
-      throw new IllegalStateException("Closed");
-    }
-    return tmp;
-  }
-  
-  private void unsetCurrent() {
-    rwlock.writeLock().lock();
-    try {
-      SortedBuffer<Integer> tmp = current.getAndSet(null);
-      if (tmp != null) {
-        tmp.clear();
-      }
-    } finally {
-      rwlock.writeLock().unlock();
-    }
-  }
-  
-  private void releaseTestDelay() {
-    if (testDelayDuringFlush != null) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}Releasing testDelayDuringFlush", this.logPrefix);
-      }
-      testDelayDuringFlush.countDown();
-    }
-  }
-
-  private class FlushTask implements AbortableTask {
-    private final FlushHandler handler;
-    private final SortedBuffer<Integer> buffer;
-    private final long start;
-    
-    public FlushTask(FlushHandler handler, SortedBuffer<Integer> buffer, long start) {
-      this.handler = handler;
-      this.buffer = buffer;
-      this.start = start;
-    }
-    
-    @Override
-    public void runOrAbort(final AtomicBoolean aborted) {
-      try {
-        // First transfer the contents of the buffer to a new soplog.
-        final SortedOplog soplog = writeBuffer(buffer, aborted);
-        
-        // If we are aborted, someone else will cleanup the unflushed queue
-        if (soplog == null || !lockOrAbort(aborted)) {
-          handler.complete();
-          return;
-        }
-
-        try {
-          Runnable action = new Runnable() {
-            @Override
-            public void run() {
-              try {
-                compactor.add(soplog);
-                compactor.compact(false, null);
-
-                unflushed.removeFirstOccurrence(buffer);
-                
-                // TODO need to invoke this while NOT holding write lock
-                handler.complete();
-                factory.getConfiguration().getStatistics().getFlush().end(buffer.dataSize(), start);
-                
-              } catch (Exception e) {
-                handleError(e, aborted);
-                return;
-              }
-            }
-          };
-          
-          // Enforce flush ordering for consistency.  If the previous buffer flush
-          // is incomplete, we defer completion and release the thread to avoid
-          // deadlocks.
-          if (buffer == unflushed.peekLast()) {
-            action.run();
-            
-            SortedBuffer<Integer> tail = unflushed.peekLast();
-            while (tail != null && tail.isDeferred() && !aborted.get()) {
-              // TODO need to invoke this while NOT holding write lock
-              tail.complete();
-              tail = unflushed.peekLast();
-            }
-          } else {
-            buffer.defer(action);
-          }
-        } finally {
-          rwlock.writeLock().unlock();
-        }
-      } catch (Exception e) {
-        handleError(e, aborted);
-      }
-    }
-    
-    @Override
-    public void abortBeforeRun() {
-      handler.complete();
-      factory.getConfiguration().getStatistics().getFlush().end(start);
-    }
-    
-    private void handleError(Exception e, AtomicBoolean aborted) {
-      if (lockOrAbort(aborted)) {
-        try {
-          unflushed.removeFirstOccurrence(buffer);
-        } finally {
-          rwlock.writeLock().unlock();
-        }
-      }  
-
-      handler.error(e);
-      factory.getConfiguration().getStatistics().getFlush().error(start);
-    }
-    
-    private SortedOplog writeBuffer(SortedBuffer<Integer> sb, AtomicBoolean aborted) 
-        throws IOException {
-      File f = compactor.getFileset().getNextFilename();
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}Flushing buffer {} to {}", SortedOplogSetImpl.this.logPrefix, sb, f);
-      }
-
-      SortedOplog so = factory.createSortedOplog(f);
-      SortedOplogWriter writer = so.createWriter();
-      try {
-        if (testErrorDuringFlush) {
-          throw new IOException("Flush error due to testErrorDuringFlush=true");
-        }
-        
-        for (Entry<byte[], byte[]> entry : sb.entries()) {
-          if (aborted.get()) {
-            writer.closeAndDelete();
-            return null;
-          }
-          writer.append(entry.getKey(), entry.getValue());
-        }
-   
-        checkTestDelay();
-        
-        writer.close(buffer.getMetadata());
-        return so;
-   
-      } catch (IOException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("{}Encountered error while flushing buffer {}", SortedOplogSetImpl.this.logPrefix, sb, e);
-        }
-        
-        writer.closeAndDelete();
-        throw e;
-      }
-    }
-
-    private void checkTestDelay() {
-      if (testDelayDuringFlush != null) {
-        try {
-          if (logger.isDebugEnabled()) {
-            logger.debug("{}Waiting for testDelayDuringFlush", SortedOplogSetImpl.this.logPrefix);
-          }
-          testDelayDuringFlush.await();
-          
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-      }
-    }
-
-    private boolean lockOrAbort(AtomicBoolean abort) {
-      try {
-        while (!abort.get()) {
-          if (rwlock.writeLock().tryLock(10, TimeUnit.MILLISECONDS)) {
-            return true;
-          }
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      return false;
-    }
-  }
-  
-  private class MergedStatistics implements SortedStatistics {
-    private final List<SortedStatistics> stats;
-    private final Collection<TrackedReference<SortedOplogReader>> soplogs;
-
-    public MergedStatistics(List<SortedStatistics> stats, Collection<TrackedReference<SortedOplogReader>> soplogs) {
-      this.stats = stats;
-      this.soplogs = soplogs;
-    }
-    
-    @Override
-    public long keyCount() {
-      // TODO we have no way of determining the overall key population
-      // just assume no overlap for now
-      long keys = 0;
-      for (SortedStatistics ss : stats) {
-        keys += ss.keyCount();
-      }
-      return keys;
-    }
-
-    @Override
-    public byte[] firstKey() {
-      byte[] first = stats.get(0).firstKey();
-      for (int i = 1; i < stats.size(); i++) {
-        byte[] tmp = stats.get(i).firstKey();
-        if (getComparator().compare(first, tmp) > 0) {
-          first = tmp;
-        }
-      }
-      return first;
-    }
-
-    @Override
-    public byte[] lastKey() {
-      byte[] last = stats.get(0).lastKey();
-      for (int i = 1; i < stats.size(); i++) {
-        byte[] tmp = stats.get(i).lastKey();
-        if (getComparator().compare(last, tmp) < 0) {
-          last = tmp;
-        }
-      }
-      return last;
-    }
-
-    @Override
-    public double avgKeySize() {
-      double avg = 0;
-      for (SortedStatistics ss : stats) {
-        avg += ss.avgKeySize();
-      }
-      return avg / stats.size();
-    }
-
-    @Override
-    public double avgValueSize() {
-      double avg = 0;
-      for (SortedStatistics ss : stats) {
-        avg += ss.avgValueSize();
-      }
-      return avg / stats.size();
-    }
-
-    @Override
-    public void close() {
-      TrackedReference.decrementAll(soplogs);
-    }
-  }
-  
-  /**
-   * Provides ordered iteration across a set of sorted data sets. 
-   */
-  public static class MergedIterator 
-    extends AbstractKeyValueIterator<ByteBuffer, ByteBuffer> 
-    implements SortedIterator<ByteBuffer>
-  {
-    /** the comparison operator */
-    private final SerializedComparator comparator;
-    
-    /** the reference counted soplogs */
-    private final Collection<TrackedReference<SortedOplogReader>> soplogs;
-
-    /** the backing iterators */
-    private final List<SortedIterator<ByteBuffer>> iters;
-    
-    /** the current key */
-    private ByteBuffer key;
-    
-    /** the current value */
-    private ByteBuffer value;
-    
-    public MergedIterator(SerializedComparator comparator, 
-        Collection<TrackedReference<SortedOplogReader>> soplogs, 
-        List<SortedIterator<ByteBuffer>> iters) {
-      this.comparator = comparator;
-      this.soplogs = soplogs;
-      this.iters = iters;
-      
-      // initialize iteration positions
-      int i = 0;
-      while (i < iters.size()) {
-        i = advance(i);
-      }
-    }
-    
-    @Override
-    public ByteBuffer key() {
-      return key;
-    }
-
-    @Override
-    public ByteBuffer value() {
-      return value;
-    }
-
-    @Override
-    protected boolean step() {
-      if (iters.isEmpty() || readerIsClosed()) {
-        return false;
-      }
-      
-      int cursor = 0;
-      key = iters.get(cursor).key();
-      
-      int i = 1;
-      while (i < iters.size()) {
-        ByteBuffer tmp = iters.get(i).key();
-        
-        int diff = comparator.compare(tmp.array(), tmp.arrayOffset(), tmp.remaining(), 
-            key.array(), key.arrayOffset(), key.remaining());
-        if (diff < 0) {
-          cursor = i++;
-          key = tmp;
-          
-        } else if (diff == 0) {
-          i = advance(i);
-          
-        } else {
-          i++;
-        }
-      }
-      
-      value = iters.get(cursor).value();
-      advance(cursor);
-      
-      return true;
-    }
-    
-    @Override
-    public void close() {
-      for (SortedIterator<ByteBuffer> iter : iters) {
-        iter.close();
-      }
-      TrackedReference.decrementAll(soplogs);
-    }
-
-    private int advance(int idx) {
-      // either advance the cursor or remove the iterator
-      if (!iters.get(idx).hasNext()) {
-        iters.remove(idx).close();
-        return idx;
-      }
-      iters.get(idx).next();
-      return idx + 1;
-    }
-    
-    private boolean readerIsClosed() {
-      for (TrackedReference<SortedOplogReader> tr : soplogs) {
-        if (tr.get().isClosed()) {
-          return true;
-        }
-      }
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java
deleted file mode 100644
index eb5154c..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile;
-
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
-
-public class BlockCacheHolder {
-  private BlockCache cache;
-  private HFileStoreStatistics stats;
-  
-  public BlockCacheHolder(HFileStoreStatistics stats, BlockCache cache) {
-    this.stats = stats;
-    this.cache = cache;
-  }
-
-  public synchronized BlockCache getBlockCache() {
-    return cache;
-  }
-  
-  public synchronized HFileStoreStatistics getHFileStoreStats() {
-    return stats;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java
deleted file mode 100644
index 56c6960..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java
+++ /dev/null
@@ -1,694 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.AbstractSortedReader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.DelegatingSerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.ReversingSerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedBuffer.BufferIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedStatistics;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.util.Bytes;
-import com.gemstone.gemfire.internal.util.Hex;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
-import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
-import org.apache.hadoop.hbase.util.BloomFilterFactory;
-import org.apache.hadoop.hbase.util.BloomFilterWriter;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.logging.log4j.Logger;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * Provides a soplog backed by an HFile.
- * 
- * @author bakera
- */
-public class HFileSortedOplog implements SortedOplog {
-  public static final byte[] MAGIC          = new byte[] { 0x53, 0x4F, 0x50 };
-  public static final byte[] VERSION_1      = new byte[] { 0x1 };
-  
-  // FileInfo is not visible
-  private static final byte[] AVG_KEY_LEN   = "hfile.AVG_KEY_LEN".getBytes();
-  private static final byte[] AVG_VALUE_LEN = "hfile.AVG_VALUE_LEN".getBytes();
-  
-  /** a default bloom filter */
-  private static final BloomFilter DUMMY_BLOOM = new BloomFilter() {
-    @Override
-    public boolean mightContain(byte[] key) {
-      return true;
-    }
-  };
-
-  static final Configuration hconf;
-  private static final FileSystem fs;
-
-  static {
-    // Leave these HBase properties set to defaults for now
-    //
-    // hfile.block.cache.size (25% of heap)
-    // hbase.hash.type (murmur)
-    // hfile.block.index.cacheonwrite (false)
-    // hfile.index.block.max.size (128k)
-    // hfile.format.version (2)
-    // io.storefile.bloom.block.size (128k)
-    // hfile.block.bloom.cacheonwrite (false)
-    // hbase.rs.cacheblocksonwrite (false)
-    // hbase.offheapcache.minblocksize (64k)
-    // hbase.offheapcache.percentage (0)
-    hconf = new Configuration();
-
-    hconf.setBoolean("hbase.metrics.showTableName", true);
-    SchemaMetrics.configureGlobally(hconf);
-
-    try {
-      fs = FileSystem.get(hconf);
-    } catch (IOException e) {
-      throw new IllegalStateException(e);
-    }
-  }
-
-  private static enum InternalMetadata {
-    /** identifies the soplog as a gemfire file, required */
-    GEMFIRE_MAGIC,
-    
-    /** identifies the soplog version, required */
-    VERSION,
-    
-    /** identifies the statistics data */
-    STATISTICS,
-
-    /** identifies the names of embedded comparators */
-    COMPARATORS;
-
-    public byte[] bytes() {
-      return ("gemfire." + name()).getBytes();
-    }
-  }
-  
-  //logger instance
-  private static final Logger logger = LogService.getLogger();
-  protected final String logPrefix;
-  
-  /** the configuration */
-  private final SortedOplogConfiguration sopConfig;
-  
-  /** the hfile cache config */
-  private final CacheConfig hcache;
-  
-  /** the hfile location */
-  private Path path;
-  
-  public HFileSortedOplog(File hfile, SortedOplogConfiguration sopConfig) throws IOException {
-    assert hfile != null;
-    assert sopConfig != null;
-    
-    this.sopConfig = sopConfig;
-    path = fs.makeQualified(new Path(hfile.toString()));
-    
-//    hcache = new CacheConfig(hconf, sopConfig.getCacheDataBlocksOnRead(), sopConfig.getBlockCache(), 
-//        HFileSortedOplogFactory.convertStatistics(sopConfig.getStatistics(), sopConfig.getStoreStatistics()));
-    hcache = new CacheConfig(hconf);
-    this.logPrefix = "<" + sopConfig.getName() + "> ";
-  }
-
-  @Override
-  public SortedOplogReader createReader() throws IOException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Creating an HFile reader on " + path, logPrefix);
-    }
-    
-    return new HFileSortedOplogReader();
-  }
-
-  @Override
-  public SortedOplogWriter createWriter() throws IOException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Creating an HFile writer on " + path, logPrefix);
-    }
-
-    return new HFileSortedOplogWriter();  
-  }
-  
-  SortedOplogConfiguration getConfiguration() {
-    return sopConfig;
-  }
-  
-  private class HFileSortedOplogReader extends AbstractSortedReader implements SortedOplogReader {
-    private final Reader reader;
-    private final BloomFilter bloom;
-    private final SortedStatistics stats;
-    private volatile boolean closed;
-    
-    public HFileSortedOplogReader() throws IOException {
-      reader = HFile.createReader(fs, path, hcache);
-      validate();
-      
-      stats = new HFileSortedStatistics(reader);
-      closed = false;
-      
-      if (reader.getComparator() instanceof DelegatingSerializedComparator) {
-        loadComparators((DelegatingSerializedComparator) reader.getComparator());
-      }
-
-      DataInput bin = reader.getGeneralBloomFilterMetadata();
-      if (bin != null) {
-        final org.apache.hadoop.hbase.util.BloomFilter hbloom = BloomFilterFactory.createFromMeta(bin, reader);
-        if (reader.getComparator() instanceof DelegatingSerializedComparator) {
-          loadComparators((DelegatingSerializedComparator) hbloom.getComparator());
-        }
-
-        bloom = new BloomFilter() {
-          @Override
-          public boolean mightContain(byte[] key) {
-            assert key != null;
-            
-            long start = sopConfig.getStatistics().getBloom().begin();
-            boolean foundKey = hbloom.contains(key, 0, key.length, null);
-            sopConfig.getStatistics().getBloom().end(start);
-            
-            if (logger.isTraceEnabled()) {
-              logger.trace(String.format("{}Bloom check on %s for key %s: %b", 
-                  path, Hex.toHex(key), foundKey), logPrefix);
-            }
-            return foundKey;
-          }
-        };
-        
-      } else {
-        bloom = DUMMY_BLOOM;
-      }
-    }
-    
-    @Override
-    public boolean mightContain(byte[] key) {
-      return getBloomFilter().mightContain(key);
-    }
-
-    @Override
-    public ByteBuffer read(byte[] key) throws IOException {
-      assert key != null;
-      
-      if (logger.isTraceEnabled()) {
-        logger.trace(String.format("{}Reading key %s from %s", Hex.toHex(key), path), logPrefix);
-      }
-
-      long start = sopConfig.getStatistics().getRead().begin();
-      try {
-        HFileScanner seek = reader.getScanner(true, true);
-        if (seek.seekTo(key) == 0) {
-          ByteBuffer val = seek.getValue();
-          sopConfig.getStatistics().getRead().end(val.remaining(), start);
-          
-          return val;
-        }
-        
-        sopConfig.getStatistics().getRead().end(start);
-        sopConfig.getStatistics().getBloom().falsePositive();
-        return null;
-        
-      } catch (IOException e) {
-        sopConfig.getStatistics().getRead().error(start);
-        throw (IOException) e.fillInStackTrace();
-      }
-    }
-
-    @Override
-    public SortedIterator<ByteBuffer> scan(
-        byte[] from, boolean fromInclusive, 
-        byte[] to, boolean toInclusive,
-        boolean ascending,
-        MetadataFilter filter) throws IOException {
-      if (filter == null || filter.accept(getMetadata(filter.getName()))) {
-        SerializedComparator tmp = (SerializedComparator) reader.getComparator();
-        tmp = ascending ? tmp : ReversingSerializedComparator.reverse(tmp); 
-  
-//        HFileScanner scan = reader.getScanner(true, false, ascending, false);
-        HFileScanner scan = reader.getScanner(true, false, false);
-        return new HFileSortedIterator(scan, tmp, from, fromInclusive, to, toInclusive);
-      }
-      return new BufferIterator(Collections.<byte[], byte[]>emptyMap().entrySet().iterator());
-    }
-
-    @Override
-    public SerializedComparator getComparator() {
-      return (SerializedComparator) reader.getComparator();
-    }
-
-    @Override
-    public SortedStatistics getStatistics() {
-      return stats;
-    }
-
-    @Override
-    public boolean isClosed() {
-      return closed;
-    }
-    
-    @Override
-    public void close() throws IOException {
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}Closing reader on " + path, logPrefix);
-      }
-      reader.close();
-      closed = true;
-    }
-
-    @Override
-    public BloomFilter getBloomFilter() {
-      return bloom;
-    }
-
-    @Override
-    public byte[] getMetadata(Metadata name) throws IOException {
-      assert name != null;
-      
-      return reader.loadFileInfo().get(name.bytes());
-    }
-    
-    @Override
-    public File getFile() {
-      return new File(path.toUri());
-    }
-    
-    @Override
-    public String getFileName() {
-      return path.getName();
-    }
-   
-    @Override
-    public long getModificationTimeStamp() throws IOException {
-      FileStatus[] stats = FSUtils.listStatus(fs, path, null);
-      if (stats != null && stats.length == 1) {
-        return stats[0].getModificationTime();
-      } else {
-        return 0;
-      }
-    }
-    
-    @Override
-    public void rename(String name) throws IOException {
-      Path parent = path.getParent();
-      Path newPath = new Path(parent, name);
-      fs.rename(path, newPath);
-      // update path to point to the new path
-      path = newPath;
-    }
-    
-    @Override
-    public void delete() throws IOException {
-      fs.delete(path, false);
-    }
-    
-    @Override
-    public String toString() {
-      return path.toString();
-    }
-    
-    private byte[] getMetadata(InternalMetadata name) throws IOException {
-      return reader.loadFileInfo().get(name.bytes());
-    }
-    
-    private void validate() throws IOException {
-      // check magic
-      byte[] magic = getMetadata(InternalMetadata.GEMFIRE_MAGIC);
-      if (!Arrays.equals(magic, MAGIC)) {
-        throw new IOException(LocalizedStrings.Soplog_INVALID_MAGIC.toLocalizedString(Hex.toHex(magic)));
-      }
-      
-      // check version compatibility
-      byte[] ver = getMetadata(InternalMetadata.VERSION);
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}Soplog version is " + Hex.toHex(ver), logPrefix);
-      }
-      
-      if (!Arrays.equals(ver, VERSION_1)) {
-        throw new IOException(LocalizedStrings.Soplog_UNRECOGNIZED_VERSION.toLocalizedString(Hex.toHex(ver)));
-      }
-    }
-    
-    private void loadComparators(DelegatingSerializedComparator comparator) throws IOException {
-      byte[] raw = reader.loadFileInfo().get(InternalMetadata.COMPARATORS.bytes());
-      assert raw != null;
-
-      DataInput in = new DataInputStream(new ByteArrayInputStream(raw));
-      comparator.setComparators(readComparators(in));
-    }
-    
-    private SerializedComparator[] readComparators(DataInput in) throws IOException {
-      try {
-        SerializedComparator[] comps = new SerializedComparator[in.readInt()];
-        assert comps.length > 0;
-        
-        for (int i = 0; i < comps.length; i++) {
-          comps[i] = (SerializedComparator) Class.forName(in.readUTF()).newInstance();
-          if (comps[i] instanceof DelegatingSerializedComparator) {
-            ((DelegatingSerializedComparator) comps[i]).setComparators(readComparators(in));
-          }
-        }
-        return comps;
-        
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
-  }
-  
-  private class HFileSortedOplogWriter implements SortedOplogWriter {
-    private final Writer writer;
-    private final BloomFilterWriter bfw;
-    
-    public HFileSortedOplogWriter() throws IOException {
-      writer = HFile.getWriterFactory(hconf, hcache)
-          .withPath(fs, path)
-          .withBlockSize(sopConfig.getBlockSize())
-          .withBytesPerChecksum(sopConfig.getBytesPerChecksum())
-          .withChecksumType(HFileSortedOplogFactory.convertChecksum(sopConfig.getChecksum()))
-//          .withComparator(sopConfig.getComparator())
-          .withCompression(HFileSortedOplogFactory.convertCompression(sopConfig.getCompression()))
-          .withDataBlockEncoder(HFileSortedOplogFactory.convertEncoding(sopConfig.getKeyEncoding()))
-          .create();
-      
-      bfw = sopConfig.isBloomFilterEnabled() ?
-//          BloomFilterFactory.createGeneralBloomAtWrite(hconf, hcache, BloomType.ROW, 
-//              0, writer, sopConfig.getComparator())
-          BloomFilterFactory.createGeneralBloomAtWrite(hconf, hcache, BloomType.ROW, 
-              0, writer)
-          : null;
-    }
-    
-    @Override
-    public void append(byte[] key, byte[] value) throws IOException {
-      assert key != null;
-      assert value != null;
-
-      if (logger.isTraceEnabled()) {
-        logger.trace(String.format("{}Appending key %s to %s", Hex.toHex(key), path), logPrefix);
-      }
-
-      try {
-        writer.append(key, value);
-        if (bfw != null) {
-          bfw.add(key, 0, key.length);
-        }
-      } catch (IOException e) {
-        throw (IOException) e.fillInStackTrace();
-      }
-    }
-
-    @Override
-    public void append(ByteBuffer key, ByteBuffer value) throws IOException {
-      assert key != null;
-      assert value != null;
-
-      if (logger.isTraceEnabled()) {
-        logger.trace(String.format("{}Appending key %s to %s", 
-            Hex.toHex(key.array(), key.arrayOffset(), key.remaining()), path), logPrefix);
-      }
-
-      try {
-        byte[] keyBytes = new byte[key.remaining()];
-        key.duplicate().get(keyBytes);
-        byte[] valueBytes = new byte[value.remaining()];
-        value.duplicate().get(valueBytes);
-        writer.append(keyBytes, valueBytes);
-        if (bfw != null) {
-          bfw.add(key.array(), key.arrayOffset(), key.remaining());
-        }
-      } catch (IOException e) {
-        throw (IOException) e.fillInStackTrace();
-      }
-    }
-    
-    @Override
-    public void close(EnumMap<Metadata, byte[]> metadata) throws IOException {
-      if (logger.isTraceEnabled()) {
-        logger.debug("{}Finalizing and closing writer on " + path, logPrefix);
-      }
-
-      if (bfw != null) {
-        bfw.compactBloom();
-        writer.addGeneralBloomFilter(bfw);
-      }
-      
-      // append system metadata
-      writer.appendFileInfo(InternalMetadata.GEMFIRE_MAGIC.bytes(), MAGIC);
-      writer.appendFileInfo(InternalMetadata.VERSION.bytes(), VERSION_1);
-      
-      // append comparator info
-//      if (writer.getComparator() instanceof DelegatingSerializedComparator) {
-//        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-//        DataOutput out = new DataOutputStream(bos);
-//        
-//        writeComparatorInfo(out, ((DelegatingSerializedComparator) writer.getComparator()).getComparators());
-//        writer.appendFileInfo(InternalMetadata.COMPARATORS.bytes(), bos.toByteArray());
-//      }
-      
-      // TODO write statistics data to soplog
-      // writer.appendFileInfo(Meta.STATISTICS.toBytes(), null);
-
-      // append user metadata
-      if (metadata != null) {
-        for (Entry<Metadata, byte[]> entry : metadata.entrySet()) {
-          writer.appendFileInfo(entry.getKey().name().getBytes(), entry.getValue());
-        }
-      }
-      
-      writer.close();
-    }
-    
-    @Override
-    public void closeAndDelete() throws IOException {
-      if (logger.isTraceEnabled()) {
-        logger.debug("{}Closing writer and deleting " + path, logPrefix);
-      }
-
-      writer.close();
-      new File(writer.getPath().toUri()).delete();
-    }
-    
-//    private void writeComparatorInfo(DataOutput out, SerializedComparator[] comparators) throws IOException {
-//      out.writeInt(comparators.length);
-//      for (SerializedComparator sc : comparators) {
-//        out.writeUTF(sc.getClass().getName());
-//        if (sc instanceof DelegatingSerializedComparator) {
-//          writeComparatorInfo(out, ((DelegatingSerializedComparator) sc).getComparators());
-//        }
-//      }
-//    }
-  }
-  
-  private class HFileSortedIterator implements SortedIterator<ByteBuffer> {
-    private final HFileScanner scan;
-    private final SerializedComparator comparator;
-    
-    private final byte[] from;
-    private final boolean fromInclusive;
-
-    private final byte[] to;
-    private final boolean toInclusive;
-    
-    private final long start;
-    private long bytes;
-    
-    private boolean foundNext;
-    
-    private ByteBuffer key;
-    private ByteBuffer value;
-    
-    public HFileSortedIterator(HFileScanner scan, SerializedComparator comparator, 
-        byte[] from, boolean fromInclusive, 
-        byte[] to, boolean toInclusive) throws IOException {
-      this.scan = scan;
-      this.comparator = comparator;
-      this.from = from;
-      this.fromInclusive = fromInclusive;
-      this.to = to;
-      this.toInclusive = toInclusive;
-      
-      assert from == null 
-          || to == null 
-          || comparator.compare(from, 0, from.length, to, 0, to.length) <= 0;
-      
-      start = sopConfig.getStatistics().getScan().begin();
-      foundNext = evalFrom();
-    }
-    
-    @Override
-    public ByteBuffer key() {
-      return key;
-    }
-    
-    @Override 
-    public ByteBuffer value() {
-      return value;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (!foundNext) {
-        foundNext = step();
-      }
-      return foundNext;
-    }
-    
-    @Override
-    public ByteBuffer next() {
-      long startNext = sopConfig.getStatistics().getScan().beginIteration();
-      
-      if (!hasNext()) {
-        throw new NoSuchElementException();
-      }
-      
-      foundNext = false;
-      key = scan.getKey();
-      value = scan.getValue();
-      
-      int len = key.remaining() + value.remaining(); 
-      bytes += len;
-      sopConfig.getStatistics().getScan().endIteration(len, startNext);
-      
-      return key;
-    }
-    
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void close() {
-      sopConfig.getStatistics().getScan().end(bytes, start);
-    }
-
-    private boolean step() {
-      try {
-        if (!scan.isSeeked()) {
-          return false;
-          
-        } else  if (scan.next() && evalTo()) {
-          return true;
-        }
-      } catch (IOException e) {
-        throw new HDFSIOException("Error from HDFS during iteration", e);
-      }
-      return false;
-    }
-    
-    private boolean evalFrom() throws IOException {
-      if (from == null) {
-        return scan.seekTo() && evalTo();
-        
-      } else {
-        int compare = scan.seekTo(from);
-        if (compare < 0) {
-          return scan.seekTo() && evalTo();
-          
-        } else if (compare == 0 && fromInclusive) {
-          return true;
-          
-        } else {
-          return step();
-        }
-      }
-    }
-    
-    private boolean evalTo() throws IOException {
-      int compare = -1;
-      if (to != null) {
-        ByteBuffer key = scan.getKey();
-        compare = comparator.compare(
-            key.array(), key.arrayOffset(), key.remaining(), 
-            to, 0, to.length);
-      }
-
-      return compare < 0 || (compare == 0 && toInclusive);
-    }
-  }
-  
-  private static class HFileSortedStatistics implements SortedStatistics {
-    private final Reader reader;
-    private final int keySize;
-    private final int valueSize;
-    
-    public HFileSortedStatistics(Reader reader) throws IOException {
-      this.reader = reader;
-
-      byte[] sz = reader.loadFileInfo().get(AVG_KEY_LEN);
-      keySize = Bytes.toInt(sz[0], sz[1], sz[2], sz[3]);
-
-      sz = reader.loadFileInfo().get(AVG_VALUE_LEN);
-      valueSize = Bytes.toInt(sz[0], sz[1], sz[2], sz[3]);
-    }
-
-    @Override
-    public long keyCount() {
-      return reader.getEntries();
-    }
-
-    @Override
-    public byte[] firstKey() {
-      return reader.getFirstKey();
-    }
-
-    @Override
-    public byte[] lastKey() {
-      return reader.getLastKey();
-    }
-
-    @Override
-    public double avgKeySize() {
-      return keySize;
-    }
-    
-    @Override
-    public double avgValueSize() {
-      return valueSize;
-    }
-    
-    @Override
-    public void close() {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java
deleted file mode 100644
index 9546fd3..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
-import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
-import org.apache.hadoop.hbase.util.ChecksumType;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.Checksum;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.Compression;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.KeyEncoding;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-
-/**
- * Creates HFile soplogs.
- * 
- * @author bakera
- */
-public class HFileSortedOplogFactory implements SortedOplogFactory {
-  private final SortedOplogConfiguration config;
-  
-  public HFileSortedOplogFactory(String name, BlockCache blockCache, SortedOplogStatistics stats, HFileStoreStatistics storeStats) {
-    config = new SortedOplogConfiguration(name, blockCache, stats, storeStats);
-  }
-  
-  @Override
-  public SortedOplogConfiguration getConfiguration() {
-    return config;
-  }
-
-  @Override
-  public SortedOplog createSortedOplog(File name) throws IOException {
-    return new HFileSortedOplog(name, config);
-  }
-  
-  public static ChecksumType convertChecksum(Checksum type) {
-    switch (type) {
-    case NONE:  return ChecksumType.NULL;
-    
-    default:
-    case CRC32: return ChecksumType.CRC32;
-    }
-  }
-
-  public static Algorithm convertCompression(Compression type) {
-    switch (type) {
-    default:
-    case NONE: return Algorithm.NONE;
-    }
-  }
-  
-  public static HFileDataBlockEncoder convertEncoding(KeyEncoding type) {
-    switch (type) {
-    default:
-    case NONE: return NoOpDataBlockEncoder.INSTANCE;
-    }
-  }
-}