You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:58 UTC

[12/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java
new file mode 100644
index 0000000..dacc208
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import java.util.Iterator;
+
+/**
+ * Provides an {@link Iterator} that allows access to the current iteration
+ * element.  The implementor must provide access to the current element
+ * as well as a means to move to the next element.
+ * 
+ *
+ * @param <E> the element type
+ */
+public interface CursorIterator<E> extends Iterator<E> {
+  /**
+   * Returns the element at the current position.
+   * @return the current element
+   */
+  E current();
+  
+  /**
+   * Provides an iteration cursor by wrapping an {@link Iterator}.
+   *
+   * @param <E> the element type
+   */
+  public static class WrappedIterator<E> implements CursorIterator<E> {
+    /** the underlying iterator */
+    private final Iterator<E> src;
+    
+    /** the current iteration element */
+    private E current;
+    
+    public WrappedIterator(Iterator<E> src) {
+      this.src = src;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return src.hasNext();
+    }
+
+    @Override
+    public E next() {
+      current = src.next();
+      return current;
+    }
+
+    @Override
+    public E current() {
+      return current;
+    }
+    
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+    
+    /**
+     * Returns the unwrapped interator.
+     * @return the iterator
+     */
+    public Iterator<E> unwrap() {
+      return src;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java
new file mode 100644
index 0000000..52470d0
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
+
+/**
+ * Delegates object comparisons to one or more embedded comparators.
+ *  
+ */
+public interface DelegatingSerializedComparator extends SerializedComparator {
+  /**
+   * Injects the embedded comparators.
+   * @param comparators the comparators for delegation
+   */
+  void setComparators(SerializedComparator[] comparators);
+  
+  /**
+   * Returns the embedded comparators.
+   * @return the comparators
+   */
+  SerializedComparator[] getComparators();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java
new file mode 100644
index 0000000..fdf3852
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import static com.gemstone.gemfire.distributed.internal.DistributionStats.getStatTime;
+
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.Statistics;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.StatisticsTypeFactory;
+import com.gemstone.gemfire.internal.DummyStatisticsFactory;
+import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
+
+public class HFileStoreStatistics {
+  private final Statistics stats;
+  
+  private final CacheOperation blockCache;
+  
+  public HFileStoreStatistics(String typeName, String name) {
+    this(new DummyStatisticsFactory(), typeName, name);
+  }
+  
+  public HFileStoreStatistics(StatisticsFactory factory, String typeName, String name) {
+    StatisticsTypeFactory tf = StatisticsTypeFactoryImpl.singleton();
+    
+    StatisticDescriptor bcMisses = tf.createLongCounter("blockCacheMisses", "The total number of block cache misses", "misses");
+    StatisticDescriptor bcHits = tf.createLongCounter("blockCacheHits", "The total number of block cache hits", "hits");
+    StatisticDescriptor bcCached = tf.createLongGauge("blocksCached", "The current number of cached blocks", "blocks");
+    StatisticDescriptor bcBytesCached = tf.createLongGauge("blockBytesCached", "The current number of bytes cached", "bytes");
+    StatisticDescriptor bcBytesEvicted = tf.createLongCounter("blockBytesEvicted", "The total number of bytes cached", "bytes");
+
+    
+    StatisticsType type = tf.createType(typeName, 
+        "Statistics about structured I/O operations for a region", new StatisticDescriptor[] {
+        bcMisses, bcHits, bcCached, bcBytesCached, bcBytesEvicted
+    });
+
+    blockCache = new CacheOperation(bcMisses.getId(), bcHits.getId(), bcCached.getId(), bcBytesCached.getId(), bcBytesEvicted.getId());
+
+
+    stats = factory.createAtomicStatistics(type, name);
+  }
+
+  public void close() {
+    stats.close();
+  }
+  
+  public Statistics getStats() {
+    return stats;
+  }
+  
+  public CacheOperation getBlockCache() {
+    return blockCache;
+  }
+  
+  
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("blockCache = {").append(blockCache).append("}\n");
+    
+    return sb.toString();
+  }
+  
+  public class TimedOperation {
+    protected final int countId;
+    protected final int inProgressId;
+    protected final int timeId;
+    private final int errorsId;
+    
+    public TimedOperation(int count, int inProgress, int time, int errors) {
+      this.countId = count;
+      this.inProgressId = inProgress;
+      this.timeId = time;
+      this.errorsId = errors;
+    }
+    
+    public long begin() {
+      stats.incLong(inProgressId, 1);
+      return getStatTime();
+    }
+    
+    public long end(long start) {
+      stats.incLong(inProgressId, -1);
+      stats.incLong(countId, 1);
+      stats.incLong(timeId, getStatTime() - start);
+      return getStatTime();
+    }
+    
+    public void error(long start) {
+      end(start);
+      stats.incLong(errorsId, 1);
+    }
+    
+    public long getCount() {
+      return stats.getLong(countId);
+    }
+    
+    public long getInProgress() {
+      return stats.getLong(inProgressId);
+    }
+    
+    public long getTime() {
+      return stats.getLong(timeId);
+    }
+    
+    public long getErrors() {
+      return stats.getLong(errorsId);
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("count=").append(getCount());
+      sb.append(";inProgress=").append(getInProgress());
+      sb.append(";errors=").append(getErrors());
+      sb.append(";time=").append(getTime());
+      
+      return sb.toString();
+    }
+  }
+  
+  public class CacheOperation {
+    private final int missesId;
+    private final int hitsId;
+    private final int cachedId;
+    private final int bytesCachedId;
+    private final int bytesEvictedId;
+    
+    public CacheOperation(int missesId, int hitsId, int cachedId, 
+        int bytesCachedId, int bytesEvictedId) {
+      this.missesId = missesId;
+      this.hitsId = hitsId;
+      this.cachedId = cachedId;
+      this.bytesCachedId = bytesCachedId;
+      this.bytesEvictedId = bytesEvictedId;
+    }
+    
+    public void store(long bytes) {
+      stats.incLong(cachedId, 1);
+      stats.incLong(bytesCachedId, bytes);
+    }
+    
+    public void evict(long bytes) {
+      stats.incLong(cachedId, -1);
+      stats.incLong(bytesCachedId, -bytes);
+      stats.incLong(bytesEvictedId, bytes);
+    }
+    
+    public void hit() {
+      stats.incLong(hitsId, 1);
+    }
+    
+    public void miss() {
+      stats.incLong(missesId, 1);
+    }
+    
+    public long getMisses() {
+      return stats.getLong(missesId);
+    }
+    
+    public long getHits() {
+      return stats.getLong(hitsId);
+    }
+    
+    public long getCached() {
+      return stats.getLong(cachedId);
+    }
+    
+    public long getBytesCached() {
+      return stats.getLong(bytesCachedId);
+    }
+    
+    public long getBytesEvicted() {
+      return stats.getLong(bytesEvictedId);
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("misses=").append(getMisses());
+      sb.append(";hits=").append(getHits());
+      sb.append(";cached=").append(getCached());
+      sb.append(";bytesCached=").append(getBytesCached());
+      sb.append(";bytesEvicted=").append(getBytesEvicted());
+      
+      return sb.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java
new file mode 100644
index 0000000..df7e1ac
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import java.util.Iterator;
+
+/**
+ * Provides an {@link Iterator} view over a collection of keys and values.  The
+ * implementor must provide access to the current key/value as well as a means
+ * to move to the next pair.
+ * 
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface KeyValueIterator<K, V> extends Iterator<K> {
+  /**
+   * Returns the key at the current position.
+   * @return the key
+   */
+  public K key();
+  
+  /**
+   * Returns the value at the current position.
+   * @return the value
+   */
+  public abstract V value();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java
new file mode 100644
index 0000000..35baafb
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import static com.gemstone.gemfire.distributed.internal.DistributionStats.getStatTime;
+
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.Statistics;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.StatisticsTypeFactory;
+import com.gemstone.gemfire.internal.DummyStatisticsFactory;
+import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
+
+public class SortedOplogStatistics {
+  private final Statistics stats;
+  
+  private final IOOperation read;
+  private final ScanOperation scan;
+  private final IOOperation write;
+  private final IOOperation put;
+  private final IOOperation flush;
+  private final IOOperation minorCompaction;
+  private final IOOperation majorCompaction;
+  private final BloomOperation bloom;
+  private final TimedOperation clear;
+  private final TimedOperation destroy;
+  
+  private final IOOperation blockRead;
+  private final CacheOperation blockCache;
+  
+  private final int activeFilesId;
+  private final int inactiveFilesId;
+  private final int activeReadersId;
+  
+  private final int storeUsageBytesId;
+
+  public SortedOplogStatistics(String typeName, String name) {
+    this(new DummyStatisticsFactory(), typeName, name);
+  }
+  
+  public SortedOplogStatistics(StatisticsFactory factory, String typeName, String name) {
+    StatisticsTypeFactory tf = StatisticsTypeFactoryImpl.singleton();
+    
+    StatisticDescriptor readCount = tf.createLongCounter("reads", "The total number of read operations", "ops");
+    StatisticDescriptor readInProgress = tf.createLongGauge("readsInProgress", "The number of read operations in progress", "ops");
+    StatisticDescriptor readTime = tf.createLongCounter("readTime", "The total time spent reading from disk", "nanoseconds");
+    StatisticDescriptor readBytes = tf.createLongCounter("readBytes", "The total number of bytes read from disk", "bytes");
+    StatisticDescriptor readErrors = tf.createLongCounter("readErrors", "The total number of read errors", "errors");
+
+    StatisticDescriptor scanCount = tf.createLongCounter("scans", "The total number of scan operations", "ops");
+    StatisticDescriptor scanInProgress = tf.createLongGauge("scansInProgress", "The number of scan operations in progress", "ops");
+    StatisticDescriptor scanTime = tf.createLongCounter("scanTime", "The total time scanner was operational", "nanoseconds");
+    StatisticDescriptor scanBytes = tf.createLongCounter("scanBytes", "The total number of bytes scanned from disk", "bytes");
+    StatisticDescriptor scanErrors = tf.createLongCounter("scanErrors", "The total number of scan errors", "errors");
+    StatisticDescriptor scanIterations = tf.createLongCounter("scanIterations", "The total number of scan iterations", "ops");
+    StatisticDescriptor scanIterationTime = tf.createLongCounter("scanIterationTime", "The total time spent scanning from persistence layer", "nanoseconds");
+
+    StatisticDescriptor writeCount = tf.createLongCounter("writes", "The total number of write operations", "ops");
+    StatisticDescriptor writeInProgress = tf.createLongGauge("writesInProgress", "The number of write operations in progress", "ops");
+    StatisticDescriptor writeTime = tf.createLongCounter("writeTime", "The total time spent writing to disk", "nanoseconds");
+    StatisticDescriptor writeBytes = tf.createLongCounter("writeBytes", "The total number of bytes written to disk", "bytes");
+    StatisticDescriptor writeErrors = tf.createLongCounter("writeErrors", "The total number of write errors", "errors");
+
+    StatisticDescriptor putCount = tf.createLongCounter("puts", "The total number of put operations", "ops");
+    StatisticDescriptor putInProgress = tf.createLongGauge("putsInProgress", "The number of put operations in progress", "ops");
+    StatisticDescriptor putTime = tf.createLongCounter("putTime", "The total time spent in put calls", "nanoseconds");
+    StatisticDescriptor putBytes = tf.createLongCounter("putBytes", "The total number of bytes put", "bytes");
+    StatisticDescriptor putErrors = tf.createLongCounter("putErrors", "The total number of put errors", "errors");
+
+    StatisticDescriptor flushCount = tf.createLongCounter("flushes", "The total number of flush operations", "ops");
+    StatisticDescriptor flushInProgress = tf.createLongGauge("flushesInProgress", "The number of flush operations in progress", "ops");
+    StatisticDescriptor flushTime = tf.createLongCounter("flushTime", "The total time spent flushing to disk", "nanoseconds");
+    StatisticDescriptor flushBytes = tf.createLongCounter("flushBytes", "The total number of bytes flushed to disk", "bytes");
+    StatisticDescriptor flushErrors = tf.createLongCounter("flushErrors", "The total number of flush errors", "errors");
+
+    StatisticDescriptor minorCompactionCount = tf.createLongCounter("minorCompactions", "The total number of minor compaction operations", "ops");
+    StatisticDescriptor minorCompactionInProgress = tf.createLongGauge("minorCompactionsInProgress", "The number of minor compaction operations in progress", "ops");
+    StatisticDescriptor minorCompactionTime = tf.createLongCounter("minorCompactionTime", "The total time spent in minor compactions", "nanoseconds");
+    StatisticDescriptor minorCompactionBytes = tf.createLongCounter("minorCompactionBytes", "The total number of bytes collected during minor compactions", "bytes");
+    StatisticDescriptor minorCompactionErrors = tf.createLongCounter("minorCompactionErrors", "The total number of minor compaction errors", "errors");
+
+    StatisticDescriptor majorCompactionCount = tf.createLongCounter("majorCompactions", "The total number of major compaction operations", "ops");
+    StatisticDescriptor majorCompactionInProgress = tf.createLongGauge("majorCompactionsInProgress", "The number of major compaction operations in progress", "ops");
+    StatisticDescriptor majorCompactionTime = tf.createLongCounter("majorCompactionTime", "The total time spent in major compactions", "nanoseconds");
+    StatisticDescriptor majorCompactionBytes = tf.createLongCounter("majorCompactionBytes", "The total number of bytes collected during major compactions", "bytes");
+    StatisticDescriptor majorCompactionErrors = tf.createLongCounter("majorCompactionErrors", "The total number of major compaction errors", "errors");
+
+    StatisticDescriptor bloomCount = tf.createLongCounter("bloomFilterCheck", "The total number of Bloom Filter checks", "ops");
+    StatisticDescriptor bloomInProgress = tf.createLongGauge("bloomFilterChecksInProgress", "The number of Bloom Filter checks in progress", "ops");
+    StatisticDescriptor bloomTime = tf.createLongCounter("bloomFilterCheckTime", "The total time spent checking the Bloom Filter", "nanoseconds");
+    StatisticDescriptor bloomErrors = tf.createLongCounter("bloomFilterErrors", "The total number of Bloom Filter errors", "errors");
+    StatisticDescriptor bloomFalsePositive = tf.createLongCounter("bloomFilterFalsePositives", "The total number of Bloom Filter false positives", "false positives");
+
+    StatisticDescriptor clearCount = tf.createLongCounter("clears", "The total number of clear operations", "ops");
+    StatisticDescriptor clearInProgress = tf.createLongGauge("clearsInProgress", "The number of clear operations in progress", "ops");
+    StatisticDescriptor clearTime = tf.createLongCounter("clearTime", "The total time spent in clear operations", "nanoseconds");
+    StatisticDescriptor clearErrors = tf.createLongGauge("clearErrors", "The total number of clear errors", "errors");
+
+    StatisticDescriptor destroyCount = tf.createLongCounter("destroys", "The total number of destroy operations", "ops");
+    StatisticDescriptor destroyInProgress = tf.createLongGauge("destroysInProgress", "The number of destroy operations in progress", "ops");
+    StatisticDescriptor destroyTime = tf.createLongCounter("destroyTime", "The total time spent in destroy operations", "nanoseconds");
+    StatisticDescriptor destroyErrors = tf.createLongGauge("destroyErrors", "The total number of destroy errors", "errors");
+
+    StatisticDescriptor brCount = tf.createLongCounter("blockReads", "The total number of block read operations", "ops");
+    StatisticDescriptor brInProgress = tf.createLongGauge("blockReadsInProgress", "The number of block read operations in progress", "ops");
+    StatisticDescriptor brTime = tf.createLongCounter("blockReadTime", "The total time spent reading blocks from disk", "nanoseconds");
+    StatisticDescriptor brBytes = tf.createLongCounter("blockReadBytes", "The total number of block bytes read from disk", "bytes");
+    StatisticDescriptor brErrors = tf.createLongCounter("blockReadErrors", "The total number of block read errors", "errors");
+
+    StatisticDescriptor bcMisses = tf.createLongCounter("blockCacheMisses", "The total number of block cache misses", "misses");
+    StatisticDescriptor bcHits = tf.createLongCounter("blockCacheHits", "The total number of block cache hits", "hits");
+    StatisticDescriptor bcCached = tf.createLongGauge("blocksCached", "The current number of cached blocks", "blocks");
+    StatisticDescriptor bcBytesCached = tf.createLongGauge("blockBytesCached", "The current number of bytes cached", "bytes");
+    StatisticDescriptor bcBytesEvicted = tf.createLongCounter("blockBytesEvicted", "The total number of bytes cached", "bytes");
+
+    StatisticDescriptor activeFileCount = tf.createLongGauge("activeFileCount", "The total number of active files", "files");
+    StatisticDescriptor inactiveFileCount = tf.createLongGauge("inactiveFileCount", "The total number of inactive files", "files");
+    StatisticDescriptor activeReaderCount = tf.createLongGauge("activeReaderCount", "The total number of active file readers", "files");
+    
+    StatisticDescriptor storeUsageBytes = tf.createLongGauge("storeUsageBytes", "The total volume occupied on persistent store", "bytes");
+    
+    StatisticsType type = tf.createType(typeName, 
+        "Statistics about structured I/O operations for a region", new StatisticDescriptor[] {
+        readCount, readInProgress, readTime, readBytes, readErrors,
+        scanCount, scanInProgress, scanTime, scanBytes, scanErrors, scanIterations, scanIterationTime,
+        writeCount, writeInProgress, writeTime, writeBytes, writeErrors,
+        putCount, putInProgress, putTime, putBytes, putErrors,
+        flushCount, flushInProgress, flushTime, flushBytes, flushErrors,
+        minorCompactionCount, minorCompactionInProgress, minorCompactionTime, minorCompactionBytes, minorCompactionErrors,
+        majorCompactionCount, majorCompactionInProgress, majorCompactionTime, majorCompactionBytes, majorCompactionErrors,
+        bloomCount, bloomInProgress, bloomTime, bloomErrors, bloomFalsePositive,
+        clearCount, clearInProgress, clearTime, clearErrors,
+        destroyCount, destroyInProgress, destroyTime, destroyErrors,
+        brCount, brInProgress, brTime, brBytes, brErrors,
+        bcMisses, bcHits, bcCached, bcBytesCached, bcBytesEvicted,
+        activeFileCount, inactiveFileCount, activeReaderCount, storeUsageBytes
+    });
+
+    read = new IOOperation(readCount.getId(), readInProgress.getId(), readTime.getId(), readBytes.getId(), readErrors.getId());
+    scan = new ScanOperation(scanCount.getId(), scanInProgress.getId(), scanTime.getId(), scanBytes.getId(), scanErrors.getId(), scanIterations.getId(), scanIterationTime.getId());    
+    write = new IOOperation(writeCount.getId(), writeInProgress.getId(), writeTime.getId(), writeBytes.getId(), writeErrors.getId());
+    put = new IOOperation(putCount.getId(), putInProgress.getId(), putTime.getId(), putBytes.getId(), putErrors.getId());
+    flush = new IOOperation(flushCount.getId(), flushInProgress.getId(), flushTime.getId(), flushBytes.getId(), flushErrors.getId());
+    minorCompaction = new IOOperation(minorCompactionCount.getId(), minorCompactionInProgress.getId(), minorCompactionTime.getId(), minorCompactionBytes.getId(), minorCompactionErrors.getId());
+    majorCompaction = new IOOperation(majorCompactionCount.getId(), majorCompactionInProgress.getId(), majorCompactionTime.getId(), majorCompactionBytes.getId(), majorCompactionErrors.getId());
+    bloom = new BloomOperation(bloomCount.getId(), bloomInProgress.getId(), bloomTime.getId(), bloomErrors.getId(), bloomFalsePositive.getId());
+    clear = new TimedOperation(clearCount.getId(), clearInProgress.getId(), clearTime.getId(), clearErrors.getId());
+    destroy = new TimedOperation(destroyCount.getId(), destroyInProgress.getId(), destroyTime.getId(), destroyErrors.getId());
+    
+    blockRead = new IOOperation(brCount.getId(), brInProgress.getId(), brTime.getId(), brBytes.getId(), brErrors.getId());
+    blockCache = new CacheOperation(bcMisses.getId(), bcHits.getId(), bcCached.getId(), bcBytesCached.getId(), bcBytesEvicted.getId());
+
+    activeFilesId = activeFileCount.getId();
+    inactiveFilesId = inactiveFileCount.getId();
+    activeReadersId = activeReaderCount.getId();
+    storeUsageBytesId = storeUsageBytes.getId();
+
+    stats = factory.createAtomicStatistics(type, name);
+  }
+
+  public void close() {
+    stats.close();
+  }
+  
+  public Statistics getStats() {
+    return stats;
+  }
+  
+  public IOOperation getRead() {
+    return read;
+  }
+  
+  public ScanOperation getScan() {
+    return scan;
+  }
+  
+  public IOOperation getWrite() {
+    return write;
+  }
+  
+  public IOOperation getPut() {
+    return put;
+  }
+  
+  public IOOperation getFlush() {
+    return flush;
+  }
+  
+  public IOOperation getMinorCompaction() {
+    return minorCompaction;
+  }
+  
+  public IOOperation getMajorCompaction() {
+    return majorCompaction;
+  }
+  
+  public BloomOperation getBloom() {
+    return bloom;
+  }
+  
+  public TimedOperation getClear() {
+    return clear;
+  }
+  
+  public TimedOperation getDestroy() {
+    return destroy;
+  }
+
+  public IOOperation getBlockRead() {
+    return blockRead;
+  }
+  
+  public CacheOperation getBlockCache() {
+    return blockCache;
+  }
+  
+  public long getActiveFileCount() {
+    return stats.getLong(activeFilesId);
+  }
+  
+  public long getInactiveFileCount() {
+    return stats.getLong(inactiveFilesId);
+  }
+  
+  public long getActiveReaderCount() {
+    return stats.getLong(activeReadersId);
+  }
+  
+  public void incActiveFiles(int amt) {
+    stats.incLong(activeFilesId, amt);
+    assert stats.getLong(activeFilesId) >= 0;
+  }
+  
+  public void incInactiveFiles(int amt) {
+    stats.incLong(inactiveFilesId, amt);
+    assert stats.getLong(inactiveFilesId) >= 0;
+  }
+  
+  public void incActiveReaders(int amt) {
+    stats.incLong(activeReadersId, amt);
+    assert stats.getLong(activeReadersId) >= 0;
+  }
+  
+  public long getStoreUsageBytes() {
+    return stats.getLong(storeUsageBytesId);
+  }
+  
+  public void incStoreUsageBytes(long amt) {
+    stats.incLong(storeUsageBytesId, amt);
+    assert stats.getLong(storeUsageBytesId) >= 0;
+  }
+  
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("read = {").append(read).append("}\n");
+    sb.append("scan = {").append(scan).append("}\n");
+    sb.append("write = {").append(write).append("}\n");
+    sb.append("put = {").append(put).append("}\n");
+    sb.append("flush = {").append(flush).append("}\n");
+    sb.append("minorCompaction = {").append(minorCompaction).append("}\n");
+    sb.append("majorCompaction = {").append(majorCompaction).append("}\n");
+    sb.append("bloom = {").append(bloom).append("}\n");
+    sb.append("clear = {").append(clear).append("}\n");
+    sb.append("destroy = {").append(destroy).append("}\n");
+    sb.append("blockRead = {").append(blockRead).append("}\n");
+    sb.append("blockCache = {").append(blockCache).append("}\n");
+    sb.append("activeFiles = ").append(stats.getLong(activeFilesId)).append("\n");
+    sb.append("inactiveFiles = ").append(stats.getLong(inactiveFilesId)).append("\n");
+    sb.append("activeReaders = ").append(stats.getLong(activeReadersId)).append("\n");
+    sb.append("storeUsageBytes = ").append(stats.getLong(storeUsageBytesId)).append("\n");
+    
+    return sb.toString();
+  }
+  
+  public class TimedOperation {
+    protected final int countId;
+    protected final int inProgressId;
+    protected final int timeId;
+    private final int errorsId;
+    
+    public TimedOperation(int count, int inProgress, int time, int errors) {
+      this.countId = count;
+      this.inProgressId = inProgress;
+      this.timeId = time;
+      this.errorsId = errors;
+    }
+    
+    public long begin() {
+      stats.incLong(inProgressId, 1);
+      return getStatTime();
+    }
+    
+    public long end(long start) {
+      stats.incLong(inProgressId, -1);
+      stats.incLong(countId, 1);
+      stats.incLong(timeId, getStatTime() - start);
+      return getStatTime();
+    }
+    
+    public void error(long start) {
+      end(start);
+      stats.incLong(errorsId, 1);
+    }
+    
+    public long getCount() {
+      return stats.getLong(countId);
+    }
+    
+    public long getInProgress() {
+      return stats.getLong(inProgressId);
+    }
+    
+    public long getTime() {
+      return stats.getLong(timeId);
+    }
+    
+    public long getErrors() {
+      return stats.getLong(errorsId);
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("count=").append(getCount());
+      sb.append(";inProgress=").append(getInProgress());
+      sb.append(";errors=").append(getErrors());
+      sb.append(";time=").append(getTime());
+      
+      return sb.toString();
+    }
+  }
+  
+  public class IOOperation extends TimedOperation {
+    protected final int bytesId;
+    
+    public IOOperation(int count, int inProgress, int time, int bytes, int errors) {
+      super(count, inProgress, time, errors);
+      this.bytesId = bytes;
+    }
+    
+    public long end(long bytes, long start) {
+      stats.incLong(bytesId, bytes);
+      return super.end(start);
+    }
+    
+    public long getBytes() {
+      return stats.getLong(bytesId);
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder(super.toString());
+      sb.append(";bytes=").append(getBytes());
+      
+      return sb.toString();
+    }
+  }
+
+  public class ScanOperation extends IOOperation {
+    private final int iterationsId;
+    private final int iterationTimeId;
+
+    public ScanOperation(int count, int inProgress, int time, int bytes, int errors, int iterCount, int iterTime) {
+      super(count, inProgress, time, bytes, errors);
+      iterationsId = iterCount;
+      iterationTimeId = iterTime;
+    }
+    
+    public long beginIteration() {
+      return getStatTime();
+    }
+    
+    public void endIteration(long bytes, long start){
+      stats.incLong(iterationsId, 1);
+      stats.incLong(bytesId, bytes);
+      stats.incLong(iterationTimeId, getStatTime() - start);
+    }
+    
+    public long getIterations() {
+      return stats.getLong(iterationsId);
+    }
+    
+    public long getIterationTime() {
+      return stats.getLong(iterationTimeId);
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder(super.toString());
+      sb.append(";iterations=").append(getIterations());
+      sb.append(";iterationTime=").append(getIterationTime());
+      
+      return sb.toString();
+    }
+  }
+
+  public class BloomOperation extends TimedOperation {
+    private final int falsePositiveId;
+    
+    public BloomOperation(int count, int inProgress, int time, int errors, int falsePositive) {
+      super(count, inProgress, time, errors);
+      this.falsePositiveId = falsePositive;
+    }
+    
+    public void falsePositive() {
+      stats.incLong(falsePositiveId, 1);
+    }
+    
+    public long getFalsePositives() {
+      return stats.getLong(falsePositiveId);
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder(super.toString());
+      sb.append(";falsePositives=").append(getFalsePositives());
+      
+      return sb.toString();
+    }
+  }
+  
+  public class CacheOperation {
+    private final int missesId;
+    private final int hitsId;
+    private final int cachedId;
+    private final int bytesCachedId;
+    private final int bytesEvictedId;
+    
+    public CacheOperation(int missesId, int hitsId, int cachedId, 
+        int bytesCachedId, int bytesEvictedId) {
+      this.missesId = missesId;
+      this.hitsId = hitsId;
+      this.cachedId = cachedId;
+      this.bytesCachedId = bytesCachedId;
+      this.bytesEvictedId = bytesEvictedId;
+    }
+    
+    public void store(long bytes) {
+      stats.incLong(cachedId, 1);
+      stats.incLong(bytesCachedId, bytes);
+    }
+    
+    public void evict(long bytes) {
+      stats.incLong(cachedId, -1);
+      stats.incLong(bytesCachedId, -bytes);
+      stats.incLong(bytesEvictedId, bytes);
+    }
+    
+    public void hit() {
+      stats.incLong(hitsId, 1);
+    }
+    
+    public void miss() {
+      stats.incLong(missesId, 1);
+    }
+    
+    public long getMisses() {
+      return stats.getLong(missesId);
+    }
+    
+    public long getHits() {
+      return stats.getLong(hitsId);
+    }
+    
+    public long getCached() {
+      return stats.getLong(cachedId);
+    }
+    
+    public long getBytesCached() {
+      return stats.getLong(bytesCachedId);
+    }
+    
+    public long getBytesEvicted() {
+      return stats.getLong(bytesEvictedId);
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("misses=").append(getMisses());
+      sb.append(";hits=").append(getHits());
+      sb.append(";cached=").append(getCached());
+      sb.append(";bytesCached=").append(getBytesCached());
+      sb.append(";bytesEvicted=").append(getBytesEvicted());
+      
+      return sb.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java
new file mode 100644
index 0000000..1042e22
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Defines a means to read sorted data including performing range scans.
+ * 
+ * @param <V> type of value returned by the sorted reader
+ * 
+ */
+public interface SortedReader<V> extends Closeable {
+  /**
+   * Defines the names of additional data that may be associated with a sorted
+   * reader.
+   */
+  public enum Metadata {
+    /** identifies the disk store associated with the soplog, optional */
+    DISK_STORE,
+    
+    /** identifies the RVV data, optional */
+    RVV;
+
+    /**
+     * Converts the metadata name to bytes.
+     * @return the bytes
+     */
+    public byte[] bytes() {
+      return ("gemfire." + name()).getBytes();
+    }
+  }
+  
+  /**
+   * Filters data based on metadata values.
+   */
+  public interface MetadataFilter {
+    /**
+     * Returns the name this filter acts upon.
+     * @return the name
+     */
+    Metadata getName();
+    
+    /**
+     * Returns true if the metadata value passes the filter.
+     * @param value the value to check; may be null if the metadata value does
+     *              not exist or has not been assigned yet
+     * @return true if accepted
+     */
+    boolean accept(byte[] value);
+  }
+  
+  /**
+   * Allows comparisons between serialized objects.
+   */
+  public interface SerializedComparator extends RawComparator<byte[]> {
+  }
+  
+  /**
+   * Allows sorted iteration through a set of keys and values.
+   */
+  public interface SortedIterator<V> extends KeyValueIterator<ByteBuffer, V> {
+    /**
+     * Closes the iterator and frees any retained resources.
+     */
+    public abstract void close();
+  }
+
+  /**
+   * Defines the statistics available on a sorted file.
+   */
+  public interface SortedStatistics {
+    /**
+     * Returns the number of keys in the file.
+     * @return the key count
+     */
+    long keyCount();
+    
+    /**
+     * Returns the first key in the file.
+     * @return the first key
+     */
+    byte[] firstKey();
+    
+    /**
+     * Returns the last key in the file.
+     * @return the last key
+     */
+    byte[] lastKey();
+    
+    /**
+     * Returns the average key size in bytes.
+     * @return the average key size
+     */
+    double avgKeySize();
+    
+    /**
+     * Returns the average value size in bytes.
+     * @return the average value size
+     */
+    double avgValueSize();
+    
+    /**
+     * Frees any resources held by for statistics generation.
+     */
+    void close();
+  }
+  
+  /**
+   * Returns true if the bloom filter might contain the supplied key.  The 
+   * nature of the bloom filter is such that false positives are allowed, but
+   * false negatives cannot occur.
+   * 
+   * @param key the key to test
+   * @return true if the key might be present
+   * @throws IOException read error
+   */
+  boolean mightContain(byte[] key) throws IOException;
+
+  /**
+   * Returns the value associated with the given key.
+   * 
+   * @param key the key
+   * @return the value, or null if the key is not found
+   * @throws IOException read error
+   */
+  V read(byte[] key) throws IOException;
+
+  /**
+   * Iterates from the first key in the file to the requested key.
+   * @param to the ending key
+   * @param inclusive true if the ending key is included in the iteration
+   * @return the sorted iterator
+   * @throws IOException scan error
+   */
+  SortedIterator<V> head(byte[] to, boolean inclusive) throws IOException;
+  
+  /**
+   * Iterates from the requested key to the last key in the file.
+   * @param from the starting key
+   * @param inclusive true if the starting key should be included in the iteration
+   * @return the sorted iterator
+   * @throws IOException scan error
+   */
+  SortedIterator<V> tail(byte[] from, boolean inclusive) throws IOException;
+
+  /**
+   * Iterators over the entire contents of the sorted file.
+   * 
+   * @return the sorted iterator
+   * @throws IOException scan error
+   */
+  SortedIterator<V> scan() throws IOException;
+  
+  /**
+   * Scans the available keys and allows iteration over the interval [from, to) 
+   * where the starting key is included and the ending key is excluded from 
+   * the results.
+   * 
+   * @param from the start key
+   * @param to the end key
+   * @return the sorted iterator
+   * @throws IOException scan error
+   */
+  SortedIterator<V> scan(byte[] from, byte[] to) throws IOException;
+
+  /**
+   * Scans the keys and returns an iterator over the interval [equalTo, equalTo].
+   * 
+   * @param equalTo the key to match
+   * @return the sorted iterator
+   * @throws IOException scan error
+   */
+  SortedIterator<V> scan(byte[] equalTo) throws IOException;
+  
+  /**
+   * Scans the keys and allows iteration between the given keys.
+   * 
+   * @param from the start key
+   * @param fromInclusive true if the start key is included in the scan
+   * @param to the end key
+   * @param toInclusive true if the end key is included in the scan
+   * @return the sorted iterator
+   * @throws IOException scan error
+   */
+  SortedIterator<V> scan(byte[] from, boolean fromInclusive, 
+      byte[] to, boolean toInclusive) throws IOException;
+
+  /**
+   * Scans the keys and allows iteration between the given keys after applying
+   * the metdata filter and the order flag.  These parameters override values
+   * configured using <code>withAscending</code> or <code>withFilter</code>.
+   * 
+   * @param from the start key
+   * @param fromInclusive true if the start key is included in the scan
+   * @param to the end key
+   * @param toInclusive true if the end key is included in the scan
+   * @param ascending true if ascending
+   * @param filter filters data based on metadata values
+   * @return the sorted iterator
+   * @throws IOException scan error
+   */
+  SortedIterator<V> scan(
+      byte[] from, boolean fromInclusive, 
+      byte[] to, boolean toInclusive,
+      boolean ascending,
+      MetadataFilter filter) throws IOException;
+
+  /**
+   * Changes the iteration order of subsequent operations.
+   * 
+   * @param ascending true if ascending order (default)
+   * @return the reader
+   */
+  SortedReader<V> withAscending(boolean ascending);
+  
+  /**
+   * Applies a metadata filter to subsequent operations.
+   * 
+   * @param filter the filter to apply
+   * @return the reader
+   */
+  SortedReader<V> withFilter(MetadataFilter filter);
+  
+  /**
+   * Returns the comparator used for sorting keys.
+   * @return the comparator
+   */
+  SerializedComparator getComparator();
+  
+  /**
+   * Returns the statistics regarding the keys present in the sorted file.
+   * @return the statistics
+   * @throws IOException unable retrieve statistics
+   */
+  SortedStatistics getStatistics() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java
new file mode 100644
index 0000000..2934f07
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tracks the usage of a reference.
+ * 
+ *
+ * @param <T> the reference type
+ */
+public final class TrackedReference<T> {
+  /** the referent */
+  private final T ref;
+  
+  /** the number of uses */
+  private final AtomicInteger uses;
+  
+  /** list of users using this reference. Mainly for debugging */
+  final ConcurrentHashMap<String, AtomicInteger> users;
+
+  /**
+   * Decrements the use count of each reference.
+   * @param refs the references to decrement
+   */
+  public static <T> void decrementAll(Iterable<TrackedReference<T>> refs) {
+    for (TrackedReference<?> tr : refs) {
+      tr.decrement();
+    }
+  }
+  
+  public TrackedReference(T ref) {
+    this.ref = ref;
+    uses = new AtomicInteger(0);
+    users = new ConcurrentHashMap<String, AtomicInteger>();
+  }
+  
+  /**
+   * Returns the referent.
+   * @return the referent
+   */
+  public final T get() {
+    return ref;
+  }
+  
+  /**
+   * Returns the current count.
+   * @return the current uses
+   */
+  public int uses() {
+    return uses.get();
+  }
+  
+  /**
+   * Returns true if the reference is in use.
+   * @return true if used
+   */
+  public boolean inUse() {
+    return uses() > 0;
+  }
+  
+  /**
+   * Increments the use count and returns the reference.
+   * @return the reference
+   */
+  public T getAndIncrement() {
+    increment();
+    return ref;
+  }
+  
+  /**
+   * Increments the use counter and returns the current count.
+   * @return the current uses
+   */
+  public int increment() {
+    return increment(null);
+  }
+  
+  /**
+   * Increments the use counter and returns the current count.
+   * @return the current uses
+   */
+  public int increment(String user) {
+    int val = uses.incrementAndGet();
+    if (user != null) {
+      AtomicInteger counter = users.get(user);
+      if (counter == null) {
+        counter = new AtomicInteger();
+        users.putIfAbsent(user, counter);
+        counter = users.get(user);
+      }
+      counter.incrementAndGet();
+    }
+    assert val >= 1;
+    
+    return val;
+  }
+  
+  /**
+   * Decrements the use counter and returns the current count.
+   * @return the current uses
+   */
+  public int decrement() {
+    return decrement(null);
+  }
+  
+  /**
+   * Decrements the use counter and returns the current count.
+   * @return the current uses
+   */
+  public int decrement(String user) {
+    int val = uses.decrementAndGet();
+    assert val >= 0;
+    if (user != null) {
+      AtomicInteger counter = users.get(user);
+      if (counter != null) {
+        counter.decrementAndGet();
+      }
+    }
+    
+    return val;
+  }
+  
+  @Override
+  public String toString() {
+    if (users != null) {
+      StringBuffer sb = new StringBuffer();
+      sb.append(ref.toString()).append(": ").append(uses());
+      for (Entry<String, AtomicInteger> user : users.entrySet()) {
+        sb.append(" ").append(user.getKey()).append(":").append(user.getValue().intValue());
+      }
+      return sb.toString();
+    }
+    return uses() + ": " + ref.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
index ca7818a..e6c07d9 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
@@ -1145,7 +1145,7 @@ public abstract class BaseCommand implements Command {
         VersionTagHolder versionHolder = new VersionTagHolder();
         ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
         // From Get70.getValueAndIsObject()
-        Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true);
+        Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true, false);
         VersionTag vt = versionHolder.getVersionTag();
 
         updateValues(values, entryKey, data, vt);
@@ -1252,7 +1252,7 @@ public abstract class BaseCommand implements Command {
         }
 
         ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
-        data = region.get(key, null, true, true, true, id, versionHolder, true);
+        data = region.get(key, null, true, true, true, id, versionHolder, true, false);
         versionTag = versionHolder.getVersionTag();
         updateValues(values, key, data, versionTag);
 
@@ -1345,7 +1345,7 @@ public abstract class BaseCommand implements Command {
       key = it.next();
       versionHolder = new VersionTagHolder();
 
-      Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true);
+      Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true, false);
       
       updateValues(values, key, value, versionHolder.getVersionTag());
 
@@ -1548,7 +1548,7 @@ public abstract class BaseCommand implements Command {
           ClientProxyMembershipID id = servConn == null ? null : servConn
               .getProxyID();
           data = region.get(key, null, true, true, true, id, versionHolder,
-              true);
+              true, false);
           versionTag = versionHolder.getVersionTag();
           updateValues(values, key, data, versionTag);
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
index 55047c7..7898b3c 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
@@ -24,6 +24,7 @@ import com.gemstone.gemfire.cache.client.internal.GetOp;
 import com.gemstone.gemfire.cache.operations.GetOperationContext;
 import com.gemstone.gemfire.cache.operations.internal.GetOperationContextImpl;
 import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.cache.CachedDeserializable;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
@@ -304,7 +305,7 @@ public class Get70 extends BaseCommand {
 //    } else {
       ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
       VersionTagHolder versionHolder = new VersionTagHolder();
-      data  = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, versionHolder, true);
+      data  = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, versionHolder, true, true /*allowReadFromHDFS*/);
 //    }
     versionTag = versionHolder.getVersionTag();
     

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
index 69d54a1..2a617a8 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
@@ -242,7 +242,7 @@ public class Request extends BaseCommand {
 
     boolean isObject = true;
     ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
-    Object data  = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, null, false);
+    Object data  = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, null, false, true/*allowReadFromHDFS*/);
     
     // If the value in the VM is a CachedDeserializable,
     // get its value. If it is Token.REMOVED, Token.DESTROYED,

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
index 90522b2..e896649 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
@@ -67,8 +67,8 @@ public class ClientTXRegionStub implements TXRegionStub {
 
   
   public Object findObject(KeyInfo keyInfo, boolean isCreate,
-                           boolean generateCallbacks, Object value, boolean preferCD,
-                           ClientProxyMembershipID requestingClient, EntryEventImpl event) {
+      boolean generateCallbacks, Object value, boolean preferCD,
+      ClientProxyMembershipID requestingClient, EntryEventImpl event, boolean allowReadFromHDFS) {
     return proxy.get(keyInfo.getKey(), keyInfo.getCallbackArg(), event);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
index 1637c4a..7c7df53 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
@@ -17,10 +17,12 @@
 package com.gemstone.gemfire.internal.cache.tx;
 
 import java.util.Collections;
+import java.util.Map;
 
 import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.EntryNotFoundException;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.RemoteTransactionException;
 import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
 import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
 import com.gemstone.gemfire.cache.TransactionException;
@@ -30,6 +32,7 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
 import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation;
 import com.gemstone.gemfire.internal.cache.DistributedRemoveAllOperation;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.KeyInfo;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegionException;
@@ -51,6 +54,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.RemoteSizeMessage;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
 import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantReadWriteLock;
 
 public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
   
@@ -155,13 +159,9 @@ public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
   }
 
   
-  public Object findObject(KeyInfo keyInfo,
-                           boolean isCreate,
-                           boolean generateCallbacks,
-                           Object value,
-                           boolean preferCD,
-                           ClientProxyMembershipID requestingClient,
-                           EntryEventImpl clientEvent) {
+  public Object findObject(KeyInfo keyInfo, boolean isCreate,
+      boolean generateCallbacks, Object value, boolean preferCD,
+      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean allowReadFromHDFS) {
     Object retVal = null;
     final Object key = keyInfo.getKey();
     final Object callbackArgument = keyInfo.getCallbackArg();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
index 01b1ed8..6723646 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
@@ -275,15 +275,15 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
 
   
   public Object findObject(KeyInfo keyInfo, boolean isCreate,
-                           boolean generateCallbacks, Object value, boolean peferCD,
-                           ClientProxyMembershipID requestingClient,
-                           EntryEventImpl clientEvent) {
+      boolean generateCallbacks, Object value, boolean peferCD,
+      ClientProxyMembershipID requestingClient,
+      EntryEventImpl clientEvent, boolean allowReadFromHDFS) {
     Object retVal = null;
     final Object key = keyInfo.getKey();
     final Object callbackArgument = keyInfo.getCallbackArg();
     PartitionedRegion pr = (PartitionedRegion)region;
     try {
-      retVal = pr.getRemotely((InternalDistributedMember)state.getTarget(), keyInfo.getBucketId(), key, callbackArgument, peferCD, requestingClient, clientEvent, false);
+      retVal = pr.getRemotely((InternalDistributedMember)state.getTarget(), keyInfo.getBucketId(), key, callbackArgument, peferCD, requestingClient, clientEvent, false, allowReadFromHDFS);
     } catch (TransactionException e) {
       RuntimeException re = getTransactionException(keyInfo, e);
       re.initCause(e.getCause());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
index f2859f1..482882f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
@@ -42,8 +42,8 @@ public interface TXRegionStub {
   boolean containsValueForKey(KeyInfo keyInfo);
 
   Object findObject(KeyInfo keyInfo, boolean isCreate,
-                    boolean generateCallbacks, Object value, boolean preferCD,
-                    ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent);
+      boolean generateCallbacks, Object value, boolean preferCD,
+      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean allowReadFromHDFS);
 
   Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstone);
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index fe09d03..94524bd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -157,6 +157,8 @@ public abstract class AbstractGatewaySender implements GatewaySender,
   
   protected boolean isBucketSorted;
   
+  protected boolean isHDFSQueue;
+  
   protected boolean isMetaQueue;
   
   private int parallelismForReplicatedRegion;
@@ -258,6 +260,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
     this.maxMemoryPerDispatcherQueue = this.queueMemory / this.dispatcherThreads;
     this.myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager().getDistributedSystemId();
     this.serialNumber = DistributionAdvisor.createSerialNumber();
+    this.isHDFSQueue = attrs.isHDFSQueue();
     this.isMetaQueue = attrs.isMetaQueue();
     if (!(this.cache instanceof CacheCreation)) {
       this.stopper = new Stopper(cache.getCancelCriterion());
@@ -266,7 +269,8 @@ public abstract class AbstractGatewaySender implements GatewaySender,
         this.statistics = new GatewaySenderStats(cache.getDistributedSystem(),
             id);
       }
-      initializeEventIdIndex();
+      if (!attrs.isHDFSQueue())
+        initializeEventIdIndex();
     }
     this.isBucketSorted = attrs.isBucketSorted();
   }
@@ -314,10 +318,12 @@ public abstract class AbstractGatewaySender implements GatewaySender,
             cache.getDistributedSystem(), AsyncEventQueueImpl
                 .getAsyncEventQueueIdFromSenderId(id));
       }
-      initializeEventIdIndex();
+      if (!attrs.isHDFSQueue())
+        initializeEventIdIndex();
     }
     this.isBucketSorted = attrs.isBucketSorted();
-
+    this.isHDFSQueue = attrs.isHDFSQueue();
+   
   }
   
   public GatewaySenderAdvisor getSenderAdvisor() {
@@ -476,6 +482,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
     return this.isBucketSorted;
   }
 
+  public boolean getIsHDFSQueue() {
+    return this.isHDFSQueue;
+  }
+  
   public boolean getIsMetaQueue() {
     return this.isMetaQueue;
   }
@@ -853,6 +863,12 @@ public abstract class AbstractGatewaySender implements GatewaySender,
       return;
     }
     
+    if (getIsHDFSQueue() && event.getOperation().isEviction()) {
+      if (logger.isDebugEnabled())
+        logger.debug("Eviction event not queued: " + event);
+      stats.incEventsNotQueued();
+      return;
+    }
     // this filter is defined by Asif which exist in old wan too. new wan has
     // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
     // not cinsidering this filter

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
index 1cef940..025616d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
@@ -30,6 +30,7 @@ import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
 public class GatewaySenderAttributes {
 
   public static final boolean DEFAULT_IS_BUCKETSORTED = true;
+  public static final boolean DEFAULT_IS_HDFSQUEUE = false;
   public static final boolean DEFAULT_IS_META_QUEUE = false;
 
 
@@ -81,6 +82,7 @@ public class GatewaySenderAttributes {
   
   public boolean isBucketSorted = GatewaySenderAttributes.DEFAULT_IS_BUCKETSORTED;
   
+  public boolean isHDFSQueue = GatewaySenderAttributes.DEFAULT_IS_HDFSQUEUE;
   public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE;
   
   public int getSocketBufferSize() {
@@ -189,6 +191,9 @@ public class GatewaySenderAttributes {
   public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() {
     return this.eventSubstitutionFilter;
   }
+  public boolean isHDFSQueue() {
+    return this.isHDFSQueue;
+  }
   public boolean isMetaQueue() {
     return this.isMetaQueue;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 07a3be5..b63c7cb 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -36,6 +36,9 @@ import com.gemstone.gemfire.InternalGemFireException;
 import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.EntryEvent;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
 import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
 import com.gemstone.gemfire.internal.cache.EntryEventImpl;
 import com.gemstone.gemfire.internal.cache.EnumListenerEvent;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index f995ba4..8524ccf 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -22,6 +22,8 @@ package com.gemstone.gemfire.internal.cache.wan.parallel;
 import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.CacheListener;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
 import com.gemstone.gemfire.internal.cache.Conflatable;
 import com.gemstone.gemfire.internal.cache.DistributedRegion;
 import com.gemstone.gemfire.internal.cache.ForceReattemptException;
@@ -186,6 +188,11 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
    getPGSProcessor( bucketId).notifyEventProcessorIfRequired(bucketId);
   }
   
+  public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
+    int bucketId) throws ForceReattemptException {
+	return getPGSProcessor(bucketId).getBucketRegionQueue(region, bucketId);
+  }
+  
   public void clear(PartitionedRegion pr, int bucketId) {
   	getPGSProcessor(bucketId).clear(pr, bucketId);
   }
@@ -200,6 +207,11 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
   	getPGSProcessor(bucketId).conflateEvent(conflatableObject, bucketId, tailKey);
   }
   
+  public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey,
+      int bucketId) throws ForceReattemptException {
+    return getPGSProcessor(bucketId).get(region, regionKey, bucketId);
+  }
+  
   public void addShadowPartitionedRegionForUserRR(DistributedRegion userRegion) {
 	for(int i =0; i< processors.length; i++){
   	 processors[i].addShadowPartitionedRegionForUserRR(userRegion);;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 11502af..417ba13 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -28,6 +28,9 @@ import org.apache.logging.log4j.Logger;
 import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.EntryEvent;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
 import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
 import com.gemstone.gemfire.internal.cache.Conflatable;
 import com.gemstone.gemfire.internal.cache.DistributedRegion;
@@ -101,7 +104,10 @@ public class ParallelGatewaySenderEventProcessor extends
     }
     
     ParallelGatewaySenderQueue queue;
-    queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
+    if (sender.getIsHDFSQueue())
+      queue = new HDFSParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
+    else
+      queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
     
     queue.start();
     this.queue = queue;
@@ -139,8 +145,12 @@ public class ParallelGatewaySenderEventProcessor extends
 
       // while merging 42004, kept substituteValue as it is(it is barry's
       // change 42466). bucketID is merged with eventID.getBucketID
+	 if (!sender.getIsHDFSQueue())
       gatewayQueueEvent = new GatewaySenderEventImpl(operation, event,
           substituteValue, true, eventID.getBucketID());
+    else
+      gatewayQueueEvent = new HDFSGatewayEventImpl(operation,
+          event, substituteValue, true, eventID.getBucketID());
 
       if (getSender().beforeEnqueue(gatewayQueueEvent)) {
         long start = getSender().getStatistics().startTime();
@@ -198,6 +208,16 @@ public class ParallelGatewaySenderEventProcessor extends
   	((ParallelGatewaySenderQueue)this.queue).conflateEvent(conflatableObject, bucketId, tailKey);
   }
   
+  public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey,
+    int bucketId) throws ForceReattemptException {
+    return ((HDFSParallelGatewaySenderQueue)this.queue).get(region, regionKey, bucketId);
+  }
+  
+  public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
+    int bucketId) throws ForceReattemptException {
+  	return ((HDFSParallelGatewaySenderQueue)this.queue).getBucketRegionQueue(region, bucketId);
+  }
+  
   public void addShadowPartitionedRegionForUserPR(PartitionedRegion pr) {
 	// TODO Auto-generated method stub
 	((ParallelGatewaySenderQueue)this.queue).addShadowPartitionedRegionForUserPR(pr);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 46ff263..b0b1a32 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -492,7 +492,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       if (this.userRegionNameToshadowPRMap.containsKey(regionName))
         return;
       
-      if(userPR.getDataPolicy().withPersistence() && !sender.isPersistenceEnabled()){
+      if(!isUsedForHDFS() && userPR.getDataPolicy().withPersistence() && !sender.isPersistenceEnabled()){
         throw new GatewaySenderException(
             LocalizedStrings.ParallelGatewaySenderQueue_NON_PERSISTENT_GATEWAY_SENDER_0_CAN_NOT_BE_ATTACHED_TO_PERSISTENT_REGION_1
                 .toLocalizedString(new Object[] { this.sender.getId(),
@@ -552,7 +552,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         }
 
         ParallelGatewaySenderQueueMetaRegion meta = metaRegionFactory.newMetataRegion(cache,
-            prQName, ra, sender);
+            prQName, ra, sender, isUsedForHDFS());
 
         try {
           prQ = (PartitionedRegion)cache
@@ -630,6 +630,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       bucketRegion.clear();
     }
   }
+  protected boolean isUsedForHDFS()
+  {
+    return false;
+  }
   protected void afterRegionAdd (PartitionedRegion userPR) {
 
   }
@@ -1853,12 +1857,18 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     public ParallelGatewaySenderQueueMetaRegion(String regionName,
         RegionAttributes attrs, LocalRegion parentRegion,
         GemFireCacheImpl cache, AbstractGatewaySender pgSender) {
+      this( regionName, attrs, parentRegion, cache, pgSender, false);
+    }
+    public ParallelGatewaySenderQueueMetaRegion(String regionName,
+        RegionAttributes attrs, LocalRegion parentRegion,
+        GemFireCacheImpl cache, AbstractGatewaySender pgSender, boolean isUsedForHDFS) {
       super(regionName, attrs, parentRegion, cache,
           new InternalRegionArguments().setDestroyLockFlag(true)
               .setRecreateFlag(false).setSnapshotInputStream(null)
               .setImageTarget(null)
               .setIsUsedForParallelGatewaySenderQueue(true)
-              .setParallelGatewaySender((AbstractGatewaySender)pgSender));
+              .setParallelGatewaySender((AbstractGatewaySender)pgSender)
+              .setIsUsedForHDFSParallelGatewaySenderQueue(isUsedForHDFS));
       this.sender = (AbstractGatewaySender)pgSender;
       
     }
@@ -1915,9 +1925,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   
   static class MetaRegionFactory {
     ParallelGatewaySenderQueueMetaRegion newMetataRegion(
-        GemFireCacheImpl cache, final String prQName, final RegionAttributes ra, AbstractGatewaySender sender) {
+        GemFireCacheImpl cache, final String prQName, final RegionAttributes ra, AbstractGatewaySender sender, boolean isUsedForHDFS) {
       ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion(
-          prQName, ra, null, cache, sender);
+          prQName, ra, null, cache, sender, isUsedForHDFS);
       return meta;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
index 0015665..77f9596 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
@@ -41,6 +41,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
   private int maxQueueMemory = 0;
   private boolean isParallel = false;
   private boolean isBucketSorted = false;
+  private boolean isHDFSQueue = false;
   private int dispatcherThreads = 1;
   private OrderPolicy orderPolicy = OrderPolicy.KEY;
   
@@ -61,6 +62,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
     this.orderPolicy = senderAttrs.policy;
     this.asyncEventListener = eventListener;
     this.isBucketSorted = senderAttrs.isBucketSorted; 
+    this.isHDFSQueue = senderAttrs.isHDFSQueue;
     this.gatewayEventSubstitutionFilter = senderAttrs.eventSubstitutionFilter;
   }
   
@@ -211,4 +213,11 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
   public void setBucketSorted(boolean isBucketSorted) {
     this.isBucketSorted = isBucketSorted;
   }
+  public boolean isHDFSQueue() {
+    return this.isHDFSQueue;
+  }
+  
+  public void setIsHDFSQueue(boolean isHDFSQueue) {
+    this.isHDFSQueue = isHDFSQueue;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
index d52d05e..019079d 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
@@ -91,6 +91,12 @@ import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.i18n.LogWriterI18n;
 import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
 import com.gemstone.gemfire.internal.cache.CacheServerImpl;
 import com.gemstone.gemfire.internal.cache.CacheConfig;
 import com.gemstone.gemfire.internal.cache.CacheServerLauncher;
@@ -193,7 +199,8 @@ public class CacheCreation implements InternalCache {
    * This is important for unit testing 44914.
    */
   protected final Map diskStores = new LinkedHashMap();
-
+  protected final Map hdfsStores = new LinkedHashMap();
+  
   private final List<File> backups = new ArrayList<File>();
 
   private CacheConfig cacheConfig = new CacheConfig();
@@ -507,6 +514,13 @@ public class CacheCreation implements InternalCache {
       }
     }
 
+    for(Iterator iter = this.hdfsStores.entrySet().iterator(); iter.hasNext(); ) {
+      Entry entry = (Entry) iter.next();
+      HDFSStoreCreation hdfsStoreCreation = (HDFSStoreCreation) entry.getValue();
+      HDFSStoreFactory storefactory = cache.createHDFSStoreFactory(hdfsStoreCreation);
+      storefactory.create((String) entry.getKey());
+    }
+
     cache.initializePdxRegistry();
 
     
@@ -517,6 +531,19 @@ public class CacheCreation implements InternalCache {
         (RegionAttributesCreation) getRegionAttributes(id);
       creation.inheritAttributes(cache, false);
 
+      // TODO: HDFS: HDFS store/queue will be mapped against region path and not
+      // the attribute id; don't really understand what this is trying to do
+      if (creation.getHDFSStoreName() != null)
+      {
+        HDFSStoreImpl store = cache.findHDFSStore(creation.getHDFSStoreName());
+        if(store == null) {
+          HDFSIntegrationUtil.createDefaultAsyncQueueForHDFS((Cache)cache, creation.getHDFSWriteOnly(), id);
+        }
+      }
+      if (creation.getHDFSStoreName() != null && creation.getPartitionAttributes().getColocatedWith() == null) {
+        creation.addAsyncEventQueueId(HDFSStoreFactoryImpl.getEventQueueName(id));
+      }
+      
       RegionAttributes attrs;
       // Don't let the RegionAttributesCreation escape to the user
       AttributesFactory factory = new AttributesFactory(creation);
@@ -1395,6 +1422,27 @@ public class CacheCreation implements InternalCache {
   }
   
   @Override
+  public HDFSStoreFactory createHDFSStoreFactory() {
+    // TODO Auto-generated method stub
+    return new HDFSStoreFactoryImpl(this);
+  }
+  @Override
+  public HDFSStore findHDFSStore(String storeName) {
+    return (HDFSStore)this.hdfsStores.get(storeName);
+  }
+
+  @Override
+  public Collection<HDFSStoreImpl> getHDFSStores() {
+    return this.hdfsStores.values();
+  }
+
+  public void addHDFSStore(String name, HDFSStoreCreation hs) {
+    this.hdfsStores.put(name, hs);
+  }
+
+  
+
+  @Override
   public DistributedMember getMyId() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
index aa7d49a..c6b0509 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
@@ -487,6 +487,8 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler {
   protected static final String PERSISTENT_REPLICATE_DP = "persistent-replicate";
   protected static final String PARTITION_DP = "partition";
   protected static final String PERSISTENT_PARTITION_DP = "persistent-partition";
+  protected static final String HDFS_PARTITION_DP = "hdfs-partition";
+  protected static final String HDFS_PERSISTENT_PARTITION_DP = "hdfs-persistent-partition";
 
   /** The name of the <code>keep-alive-timeout</code> attribute */
   protected static final String KEEP_ALIVE_TIMEOUT = "keep-alive-timeout";
@@ -763,6 +765,35 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler {
   public static final String ASYNC_EVENT_QUEUE = "async-event-queue";
   protected static final String ASYNC_EVENT_QUEUE_IDS = "async-event-queue-ids";
   
+  protected static final String HDFS_EVENT_QUEUE = "hdfs-event-queue";
+  protected static final String HDFS_STORE_NAME = "hdfs-store-name";
+  public static final String HDFS_STORE = "hdfs-store";
+  protected static final String HDFS_HOME_DIR = "home-dir";
+  protected static final String HDFS_READ_CACHE_SIZE = "read-cache-size";
+  protected static final String HDFS_MAX_MEMORY = "max-memory";
+  protected static final String HDFS_BATCH_SIZE = "batch-size";
+  protected static final String HDFS_BATCH_INTERVAL = "batch-interval";
+  protected static final String HDFS_DISPATCHER_THREADS = "dispatcher-threads";
+  protected static final String HDFS_BUFFER_PERSISTENT = "buffer-persistent";
+  protected static final String HDFS_SYNCHRONOUS_DISK_WRITE = "synchronous-disk-write";
+  protected static final String HDFS_DISK_STORE = "disk-store";
+  protected static final String HDFS_MAX_WRITE_ONLY_FILE_SIZE = "max-write-only-file-size";
+  public static final String HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL = "write-only-file-rollover-interval";
+  
+  protected static final String HDFS_NAMENODE_URL = "namenode-url";
+  protected static final String HDFS_CLIENT_CONFIG_FILE = "hdfs-client-config-file";
+  public static final String HDFS_PURGE_INTERVAL = "purge-interval";
+  public static final String HDFS_MAJOR_COMPACTION = "major-compaction";
+  public static final String HDFS_MAJOR_COMPACTION_INTERVAL = "major-compaction-interval";
+  public static final String HDFS_MAJOR_COMPACTION_THREADS = "major-compaction-threads";
+  public static final String HDFS_MINOR_COMPACTION = "minor-compaction";
+  public static final String HDFS_MINOR_COMPACTION_THREADS = "minor-compaction-threads";   
+  
+  public static final String HDFS_TIME_FOR_FILE_ROLLOVER = "file-rollover-time-secs";
+  
+  protected static final String HDFS_WRITE_ONLY = "hdfs-write-only";
+  protected static final String HDFS_QUEUE_BATCH_SIZE = "batch-size-mb";
+  
   /** The name of the <code>compressor</code> attribute */
   protected static final String COMPRESSOR = "compressor";
   /** The name of the <code>off-heap</code> attribute