You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:09 UTC

[23/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
new file mode 100644
index 0000000..1e6a034
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.SystemTimer;
+import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
+import com.gemstone.gemfire.internal.cache.ColocationHelper;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+
+/**
+ * Parallel Gateway Sender Queue extended for HDFS functionality 
+ *
+ */
+public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
+
+  private int currentBucketIndex = 0;
+  private int elementsPeekedAcrossBuckets = 0;
+  private SystemTimer rollListTimer = null;
+  public static final String ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP = "gemfire.ROLL_SORTED_LIST_TIME_INTERVAL_MS";
+  private final int ROLL_SORTED_LIST_TIME_INTERVAL_MS = Integer.getInteger(ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP, 3000);
+  
+  public HDFSParallelGatewaySenderQueue(AbstractGatewaySender sender,
+      Set<Region> userPRs, int idx, int nDispatcher) {
+     
+    super(sender, userPRs, idx, nDispatcher);
+    //only first dispatcher Hemant?
+    if (sender.getBucketSorted() && this.index == 0) {
+      rollListTimer = new SystemTimer(sender.getCache().getDistributedSystem(),
+          true);
+      // schedule the task to roll the skip lists
+      rollListTimer.scheduleAtFixedRate(new RollSortedListsTimerTask(), 
+          ROLL_SORTED_LIST_TIME_INTERVAL_MS, ROLL_SORTED_LIST_TIME_INTERVAL_MS);
+    }
+  }
+  
+  @Override
+  public Object peek() throws InterruptedException, CacheException {
+    /* If you call peek and use super.peek it leads to the following exception.
+     * So I'm adding an explicit UnsupportedOperationException.
+     Caused by: java.lang.ClassCastException: com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue cannot be cast to com.gemstone.gemfire.internal.cache.BucketRegionQueue
+        at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.getRandomPrimaryBucket(ParallelGatewaySenderQueue.java:964)
+        at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.peek(ParallelGatewaySenderQueue.java:1078)
+     */
+    throw new UnsupportedOperationException();
+  }
+  
+  
+  @Override
+  public void cleanUp() {
+    super.cleanUp();
+    cancelRollListTimer();
+  }
+  
+  private void cancelRollListTimer() {
+    if (rollListTimer != null) {
+      rollListTimer.cancel();
+      rollListTimer = null;
+    }
+  }
+  /**
+   * A call to this function peeks elements from the first local primary bucket. 
+   * Next call to this function peeks elements from the next local primary 
+   * bucket and so on.  
+   */
+  @Override
+  public List peek(int batchSize, int timeToWait) throws InterruptedException,
+  CacheException {
+    
+    List batch = new ArrayList();
+    
+    int batchSizeInBytes = batchSize*1024*1024;
+    PartitionedRegion prQ = getRandomShadowPR();
+    if (prQ == null || prQ.getLocalMaxMemory() == 0) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      blockProcesorThreadIfRequired();
+      return batch;
+    }
+    
+    ArrayList list = null;
+    ArrayList<Integer> pbuckets = new ArrayList<Integer>(prQ
+        .getDataStore().getAllLocalPrimaryBucketIds());
+    ArrayList<Integer> buckets = new ArrayList<Integer>();
+    for(Integer i : pbuckets) {
+    	if(i % this.nDispatcher == this.index)
+    		buckets.add(i);
+    }
+    // In case of failures, peekedEvents would possibly have some elements 
+    // add them. 
+    if (this.resetLastPeeked) {
+      int previousBucketId = -1;
+      boolean stillPrimary = true; 
+      Iterator<GatewaySenderEventImpl>  iter = peekedEvents.iterator();
+      // we need to remove the events of the bucket that are no more primary on 
+      // this node as they cannot be persisted from this node. 
+      while(iter.hasNext()) {
+        HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)iter.next();
+        if (previousBucketId != hdfsEvent.getBucketId()){
+          stillPrimary = buckets.contains(hdfsEvent.getBucketId());
+          previousBucketId = hdfsEvent.getBucketId();
+        }
+        if (stillPrimary)
+          batch.add(hdfsEvent);
+        else {
+          iter.remove();
+        }
+      }
+      this.resetLastPeeked = false;
+    }
+    
+    if (buckets.size() == 0) {
+      // Sleep a bit before trying again. provided by Dan
+      try {
+        Thread.sleep(50);
+      }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      return batch;
+    }
+    
+    if (this.sender.getBucketSorted()) {
+      
+    }
+    
+    // Each call to this function returns index of next bucket 
+    // that is to be processed. This function takes care 
+    // of the bucket sequence that is peeked by a sequence of 
+    // peek calls. 
+    // If there are bucket movements between two consecutive 
+    // calls to this function then there is chance that a bucket 
+    // is processed twice while another one is skipped. But, that is 
+    // ok because in the next round, it will be processed. 
+    Integer bIdIndex = getCurrentBucketIndex(buckets.size());
+    
+    // If we have gone through all the buckets once and no  
+    // elements were peeked from any of the buckets, take a nap.  
+    // This always sleep in the first call but that should be ok  
+    // because the timeToWait in practical use cases would be greater 
+    // than this sleep of 100 ms.  
+    if (bIdIndex == 0 && getAndresetElementsPeekedAcrossBuckets() == 0) { 
+      try { 
+        Thread.sleep(100); 
+      } catch (InterruptedException e) { 
+        Thread.currentThread().interrupt(); 
+      } 
+    } 
+    
+    HDFSBucketRegionQueue hrq = ((HDFSBucketRegionQueue)prQ
+        .getDataStore().getLocalBucketById(buckets.get(bIdIndex)));
+    
+    if (hrq == null) {
+      // bucket moved to another node after getAllLocalPrimaryBucketIds
+      // was called. Peeking not possible. return. 
+      return batch;
+    }
+    long entriesWaitingTobePeeked = hrq.totalEntries();
+    
+    if (entriesWaitingTobePeeked == 0) {
+      blockProcesorThreadIfRequired();
+      return batch;
+    }
+    
+    long currentTimeInMillis = System.currentTimeMillis();
+    long bucketSizeInBytes = hrq.getQueueSizeInBytes();
+    if (((currentTimeInMillis - hrq.getLastPeekTimeInMillis()) >  timeToWait)  
+        || ( bucketSizeInBytes > batchSizeInBytes)
+        || hrq.shouldDrainImmediately()) {
+      // peek now
+      if (logger.isDebugEnabled()) { 
+        logger.debug("Peeking queue " + hrq.getId()   + ": bucketSizeInBytes " + bucketSizeInBytes
+            + ":  batchSizeInBytes" + batchSizeInBytes
+            + ":  timeToWait" + timeToWait
+            + ":  (currentTimeInMillis - hrq.getLastPeekTimeInMillis())" + (currentTimeInMillis - hrq.getLastPeekTimeInMillis()));
+      }
+
+      list = peekAhead(buckets.get(bIdIndex), hrq);
+      
+      if (list != null && list.size() != 0 ) {
+        for (Object object : list) {
+          batch.add(object);
+          peekedEvents.add((HDFSGatewayEventImpl)object);
+        }
+      }
+    }
+    else {
+      blockProcesorThreadIfRequired();
+    }
+    if (logger.isDebugEnabled()  &&  batch.size() > 0) {
+      logger.debug(this + ":  Peeked a batch of " + batch.size() + " entries");
+    }
+    
+    setElementsPeekedAcrossBuckets(batch.size()); 
+    
+    return batch;
+  }
+  
+  /**
+   * This function maintains an index of the last processed bucket.
+   * When it is called, it returns index of the next bucket. 
+   * @param totalBuckets
+   * @return current bucket index
+   */
+  private int getCurrentBucketIndex(int totalBuckets) {
+    int retBucket = currentBucketIndex;
+    if (retBucket >=  totalBuckets) {
+      currentBucketIndex = 0;
+      retBucket = 0;
+    }
+    
+    currentBucketIndex++;
+    
+    return retBucket;
+  }
+  
+  @Override
+  public void remove(int batchSize) throws CacheException {
+    int destroyed = 0;
+    HDFSGatewayEventImpl event = null;
+    
+    if (this.peekedEvents.size() > 0)
+      event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
+    
+    while (event != null && destroyed < batchSize) {
+      Region currentRegion = event.getRegion();
+      int currentBucketId = event.getBucketId();
+      int bucketId = event.getBucketId();
+        
+      ArrayList<HDFSGatewayEventImpl> listToDestroy = new ArrayList<HDFSGatewayEventImpl>();
+      ArrayList<Object> destroyedSeqNum = new ArrayList<Object>();
+      
+      // create a batch of all the entries of a bucket 
+      while (bucketId == currentBucketId) {
+        listToDestroy.add(event);
+        destroyedSeqNum.add(event.getShadowKey());
+        destroyed++;
+
+        if (this.peekedEvents.size() == 0 || (destroyed) >= batchSize) {
+          event = null; 
+          break;
+        }
+
+        event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
+
+        bucketId = event.getBucketId();
+
+        if (!this.sender.isRunning()){
+          if (logger.isDebugEnabled()) {
+            logger.debug("ParallelGatewaySenderQueue#remove: Cache is closing down. Ignoring remove request.");
+          }
+          return;
+        }
+      }
+      try {
+        HDFSBucketRegionQueue brq = getBucketRegionQueue((PartitionedRegion) currentRegion, currentBucketId);
+        
+        if (brq != null) {
+          // destroy the entries from the bucket 
+          brq.destroyKeys(listToDestroy);
+          // Adding the removed event to the map for BatchRemovalMessage
+          // We need to provide the prQ as there could be multiple
+          // queue in a PGS now.
+          PartitionedRegion prQ = brq.getPartitionedRegion();
+          addRemovedEvents(prQ, currentBucketId, destroyedSeqNum);
+        }
+        
+      } catch (ForceReattemptException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("ParallelGatewaySenderQueue#remove: " + "Got ForceReattemptException for " + this
+          + " for bucket = " + bucketId);
+        }
+      }
+      catch(EntryNotFoundException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("ParallelGatewaySenderQueue#remove: " + "Got EntryNotFoundException for " + this
+            + " for bucket = " + bucketId );
+        }
+      }
+    }
+  }
+  
+  /** 
+  * Keeps a track of number of elements peeked across all buckets.  
+  */ 
+  private void setElementsPeekedAcrossBuckets(int peekedElements) { 
+    this.elementsPeekedAcrossBuckets +=peekedElements; 
+  } 
+  
+  /** 
+  * Returns the number of elements peeked across all buckets. Also, 
+  * resets this counter. 
+  */ 
+  private int getAndresetElementsPeekedAcrossBuckets() { 
+    int peekedElements = this.elementsPeekedAcrossBuckets; 
+    this.elementsPeekedAcrossBuckets = 0; 
+    return peekedElements; 
+  } 
+
+  @Override
+  public void remove() throws CacheException {
+    throw new UnsupportedOperationException("Method HDFSParallelGatewaySenderQueue#remove is not supported");
+  }
+ 
+  @Override
+  public void put(Object object) throws InterruptedException, CacheException {
+    super.put(object);
+  }
+  
+  protected ArrayList peekAhead(int bucketId, HDFSBucketRegionQueue hrq) throws CacheException {
+    
+    if (logger.isDebugEnabled()) {
+      logger.debug(this + ": Peekahead for the bucket " + bucketId);
+    }
+    ArrayList  list = hrq.peekABatch();
+    if (logger.isDebugEnabled() && list != null ) {
+      logger.debug(this + ": Peeked" + list.size() + "objects from bucket " + bucketId);
+    }
+
+    return list;
+  }
+  
+  @Override
+  public Object take() {
+    throw new UnsupportedOperationException("take() is not supported for " + HDFSParallelGatewaySenderQueue.class.toString());
+  }
+  
+  protected boolean isUsedForHDFS()
+  {
+    return true;
+  }
+  
+  @Override
+  protected void afterRegionAdd (PartitionedRegion userPR) {
+  }
+  
+  /**
+   * gets the value for region key from the HDFSBucketRegionQueue 
+ * @param region 
+   * @throws ForceReattemptException 
+   */
+  public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey, int bucketId) throws ForceReattemptException  {
+    try {
+      HDFSBucketRegionQueue brq = getBucketRegionQueue(region, bucketId);
+      
+      if (brq ==null)
+        return null;
+      
+      return brq.getObjectForRegionKey(region, regionKey);
+    } catch(EntryNotFoundException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("HDFSParallelGatewaySenderQueue#get: " + "Got EntryNotFoundException for " + this
+            + " for bucket = " + bucketId);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void clear(PartitionedRegion pr, int bucketId) {
+    HDFSBucketRegionQueue brq;
+    try {
+      brq = getBucketRegionQueue(pr, bucketId);
+      if (brq == null)
+        return;
+      brq.clear();
+    } catch (ForceReattemptException e) {
+      //do nothing, bucket was destroyed.
+    }
+  }
+  
+  @Override
+  public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException {
+   HDFSBucketRegionQueue hq = getBucketRegionQueue(pr, bucketId);
+   return hq.size();
+  }
+
+  public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
+      int bucketId) throws ForceReattemptException {
+    PartitionedRegion leader = ColocationHelper.getLeaderRegion(region);
+    if (leader == null)
+      return null;
+    String leaderregionPath = leader.getFullPath();
+    PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(leaderregionPath);
+    if (prQ == null)
+      return null;
+    HDFSBucketRegionQueue brq;
+
+    brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
+        .getLocalBucketById(bucketId));
+    if(brq == null) {
+      prQ.getRegionAdvisor().waitForLocalBucketStorage(bucketId);
+    }
+    brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
+        .getInitializedBucketForId(null, bucketId));
+    return brq;
+  }
+  
+  /**
+   * This class has the responsibility of rolling the lists of Sorted event 
+   * Queue. The rolling of lists by a separate thread is required because 
+   * neither put thread nor the peek/remove thread can do that. Put thread
+   * cannot do it because that would mean doing some synchronization with 
+   * other put threads and peek thread that would hamper the put latency. 
+   * Peek thread cannot do it because if the event insert rate is too high
+   * the list size can go way beyond what its size. 
+   *
+   */
+  class RollSortedListsTimerTask extends SystemTimerTask {
+    
+    
+    /**
+     * This function ensures that if any of the buckets has lists that are beyond 
+     * its size, they gets rolled over into new skip lists. 
+     */
+    @Override
+    public void run2() {
+      Set<PartitionedRegion> prQs = getRegions();
+      for (PartitionedRegion prQ : prQs) {
+        ArrayList<Integer> buckets = new ArrayList<Integer>(prQ
+            .getDataStore().getAllLocalPrimaryBucketIds());
+        for (Integer bId : buckets) {
+          HDFSBucketRegionQueue hrq =  ((HDFSBucketRegionQueue)prQ
+              .getDataStore().getLocalBucketById(bId));
+          if (hrq == null) {
+            // bucket moved to another node after getAllLocalPrimaryBucketIds
+            // was called. continue fixing the next bucket. 
+            continue;
+          }
+          if (logger.isDebugEnabled()) {
+            logger.debug("Rolling over the list for bucket id: " + bId);
+          }
+          hrq.rolloverSkipList();
+         }
+      }
+    }
+  }
+   
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
new file mode 100644
index 0000000..16d3d87
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.Serializable;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+
+/**
+ * Class to hold all hdfs store related configuration. Instead of copying the
+ * same members in two different classes, factory and store, this class will be
+ * used. The idea is let HdfsStoreImpl and HdfsStoreCreation delegate get calls,
+ * set calls and copy constructor calls this class. Moreover this config holder
+ * can be entirely replaced to support alter config
+ * 
+ */
+public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Serializable {  
+  private String name = null;
+  private String namenodeURL = null;
+  private String homeDir = DEFAULT_HOME_DIR;
+  private String clientConfigFile = null;
+  private float blockCacheSize = DEFAULT_BLOCK_CACHE_SIZE;
+  private int maxFileSize = DEFAULT_WRITE_ONLY_FILE_SIZE_LIMIT;
+  private int fileRolloverInterval = DEFAULT_WRITE_ONLY_FILE_ROLLOVER_INTERVAL;
+  protected boolean isAutoCompact = DEFAULT_MINOR_COMPACTION;
+  protected boolean autoMajorCompact = DEFAULT_MAJOR_COMPACTION;
+  protected int maxConcurrency = DEFAULT_MINOR_COMPACTION_THREADS;
+  protected int majorCompactionConcurrency = DEFAULT_MAJOR_COMPACTION_THREADS;
+  protected int majorCompactionIntervalMins = DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS;
+  protected int maxInputFileSizeMB = DEFAULT_INPUT_FILE_SIZE_MAX_MB;
+  protected int maxInputFileCount = DEFAULT_INPUT_FILE_COUNT_MAX;
+  protected int minInputFileCount = DEFAULT_INPUT_FILE_COUNT_MIN;
+  protected int oldFileCleanupIntervalMins = DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS;
+  
+  protected int batchSize = DEFAULT_BATCH_SIZE_MB;
+  protected int batchIntervalMillis = DEFAULT_BATCH_INTERVAL_MILLIS;
+  protected int maximumQueueMemory = DEFAULT_MAX_BUFFER_MEMORY;
+  protected boolean isPersistenceEnabled = DEFAULT_BUFFER_PERSISTANCE;
+  protected String diskStoreName = null;
+  protected boolean diskSynchronous = DEFAULT_DISK_SYNCHRONOUS; 
+  protected int dispatcherThreads = DEFAULT_DISPATCHER_THREADS;
+  
+  private static final Logger logger = LogService.getLogger();
+  protected final String logPrefix;
+
+  public HDFSStoreConfigHolder() {
+    this(null);
+  }
+
+  /**
+   * @param config configuration source for creating this instance 
+   */
+  public HDFSStoreConfigHolder(HDFSStore config) {
+    this.logPrefix = "<" + getName() + "> ";
+    if (config == null) {
+      return;
+    }
+    
+    this.name = config.getName();
+    this.namenodeURL = config.getNameNodeURL();
+    this.homeDir = config.getHomeDir();
+    this.clientConfigFile = config.getHDFSClientConfigFile();
+    this.blockCacheSize = config.getBlockCacheSize();
+    this.maxFileSize = config.getWriteOnlyFileRolloverSize();
+    this.fileRolloverInterval = config.getWriteOnlyFileRolloverInterval();
+    isAutoCompact = config.getMinorCompaction();
+    maxConcurrency = config.getMinorCompactionThreads();
+    autoMajorCompact = config.getMajorCompaction();
+    majorCompactionConcurrency = config.getMajorCompactionThreads();
+    majorCompactionIntervalMins = config.getMajorCompactionInterval();
+    maxInputFileSizeMB = config.getInputFileSizeMax();
+    maxInputFileCount = config.getInputFileCountMax();
+    minInputFileCount = config.getInputFileCountMin();
+    oldFileCleanupIntervalMins = config.getPurgeInterval();
+    
+    batchSize = config.getBatchSize();
+    batchIntervalMillis = config.getBatchInterval();
+    maximumQueueMemory = config.getMaxMemory();
+    isPersistenceEnabled = config.getBufferPersistent();
+    diskStoreName = config.getDiskStoreName();
+    diskSynchronous = config.getSynchronousDiskWrite();
+    dispatcherThreads = config.getDispatcherThreads();
+  }
+  
+  public void resetDefaultValues() {
+    name = null;
+    namenodeURL = null;
+    homeDir = null;
+    clientConfigFile = null;
+    blockCacheSize = -1f;
+    maxFileSize = -1;
+    fileRolloverInterval = -1;
+    
+    isAutoCompact = false;
+    maxConcurrency = -1;
+    maxInputFileSizeMB = -1;
+    maxInputFileCount = -1;
+    minInputFileCount = -1;
+    oldFileCleanupIntervalMins = -1;
+
+    autoMajorCompact = false;
+    majorCompactionConcurrency = -1;
+    majorCompactionIntervalMins = -1;
+    
+    batchSize = -1;
+    batchIntervalMillis = -1;
+    maximumQueueMemory = -1;
+    isPersistenceEnabled = false;
+    diskStoreName = null;
+    diskSynchronous = false; 
+    dispatcherThreads = -1;
+  }
+  
+  public void copyFrom(HDFSStoreMutator mutator) {
+    if (mutator.getWriteOnlyFileRolloverInterval() >= 0) {
+      logAttrMutation("fileRolloverInterval", mutator.getWriteOnlyFileRolloverInterval());
+      setWriteOnlyFileRolloverInterval(mutator.getWriteOnlyFileRolloverInterval());
+    }
+    if (mutator.getWriteOnlyFileRolloverSize() >= 0) {
+      logAttrMutation("MaxFileSize", mutator.getWriteOnlyFileRolloverInterval());
+      setWriteOnlyFileRolloverSize(mutator.getWriteOnlyFileRolloverSize());
+    }
+    
+    if (mutator.getMinorCompaction() != null) {
+      logAttrMutation("MinorCompaction", mutator.getMinorCompaction());
+      setMinorCompaction(mutator.getMinorCompaction());
+    }
+    
+    if (mutator.getMinorCompactionThreads() >= 0) {
+      logAttrMutation("MaxThreads", mutator.getMinorCompactionThreads());
+      setMinorCompactionThreads(mutator.getMinorCompactionThreads());
+    }
+    
+    if (mutator.getMajorCompactionInterval() > -1) {
+      logAttrMutation("MajorCompactionIntervalMins", mutator.getMajorCompactionInterval());
+      setMajorCompactionInterval(mutator.getMajorCompactionInterval());
+    }
+    if (mutator.getMajorCompactionThreads() >= 0) {
+      logAttrMutation("MajorCompactionMaxThreads", mutator.getMajorCompactionThreads());
+      setMajorCompactionThreads(mutator.getMajorCompactionThreads());
+    }
+    if (mutator.getMajorCompaction() != null) {
+      logAttrMutation("AutoMajorCompaction", mutator.getMajorCompaction());
+      setMajorCompaction(mutator.getMajorCompaction());
+    }
+    if (mutator.getInputFileCountMax() >= 0) {
+      logAttrMutation("maxInputFileCount", mutator.getInputFileCountMax());
+      setInputFileCountMax(mutator.getInputFileCountMax());
+    }
+    if (mutator.getInputFileSizeMax() >= 0) {
+      logAttrMutation("MaxInputFileSizeMB", mutator.getInputFileSizeMax());
+      setInputFileSizeMax(mutator.getInputFileSizeMax());
+    }
+    if (mutator.getInputFileCountMin() >= 0) {
+      logAttrMutation("MinInputFileCount", mutator.getInputFileCountMin());
+      setInputFileCountMin(mutator.getInputFileCountMin());
+    }    
+    if (mutator.getPurgeInterval() >= 0) {
+      logAttrMutation("OldFilesCleanupIntervalMins", mutator.getPurgeInterval());
+      setPurgeInterval(mutator.getPurgeInterval());
+    }
+    
+    if (mutator.getBatchSize() >= 0) {
+      logAttrMutation("batchSizeMB", mutator.getWriteOnlyFileRolloverInterval());
+      setBatchSize(mutator.getBatchSize());
+    }
+    if (mutator.getBatchInterval() >= 0) {
+      logAttrMutation("batchTimeInterval", mutator.getWriteOnlyFileRolloverInterval());
+      setBatchInterval(mutator.getBatchInterval());
+    }
+  }
+
+  void logAttrMutation(String name, Object value) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Alter " + name + ":" + value, logPrefix);
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+  @Override
+  public HDFSStoreFactory setName(String name) {
+    this.name = name;
+    return this;
+  }
+
+  @Override
+  public String getNameNodeURL() {
+    return namenodeURL;
+  }
+  @Override
+  public HDFSStoreFactory setNameNodeURL(String namenodeURL) {
+    this.namenodeURL = namenodeURL;
+    return this;
+  }
+
+  @Override
+  public String getHomeDir() {
+    return homeDir;
+  }
+  @Override
+  public HDFSStoreFactory setHomeDir(String homeDir) {
+    this.homeDir = homeDir;
+    return this;
+  }
+
+  @Override
+  public String getHDFSClientConfigFile() {
+    return clientConfigFile;
+  }
+  @Override
+  public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) {
+    this.clientConfigFile = clientConfigFile;
+    return this;
+  }
+  
+  @Override
+  public HDFSStoreFactory setBlockCacheSize(float percentage) {
+    if(percentage < 0 || percentage > 100) {
+      throw new IllegalArgumentException("Block cache size must be between 0 and 100, inclusive");
+    }
+    this.blockCacheSize  = percentage;
+    return this;
+  }
+  
+  @Override
+  public float getBlockCacheSize() {
+    return blockCacheSize;
+  }
+  
+  @Override
+  public HDFSStoreFactory setWriteOnlyFileRolloverSize(int maxFileSize) {
+    assertIsPositive(CacheXml.HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL, maxFileSize);
+    this.maxFileSize = maxFileSize;
+    return this;
+  }
+  @Override
+  public int getWriteOnlyFileRolloverSize() {
+    return maxFileSize;
+  }
+
+  @Override
+  public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) {
+    assertIsPositive(CacheXml.HDFS_TIME_FOR_FILE_ROLLOVER, count);
+    this.fileRolloverInterval = count;
+    return this;
+  }
+  @Override
+  public int getWriteOnlyFileRolloverInterval() {
+    return fileRolloverInterval;
+  }
+  
+  @Override
+  public boolean getMinorCompaction() {
+    return isAutoCompact;
+  }
+  @Override
+  public HDFSStoreFactory setMinorCompaction(boolean auto) {
+    this.isAutoCompact = auto;
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setMinorCompactionThreads(int count) {
+    assertIsPositive(CacheXml.HDFS_MINOR_COMPACTION_THREADS, count);
+    this.maxConcurrency = count;
+    return this;
+  }
+  @Override
+  public int getMinorCompactionThreads() {
+    return maxConcurrency;
+  }
+
+  @Override
+  public HDFSStoreFactory setMajorCompaction(boolean auto) {
+    this.autoMajorCompact = auto;
+    return this;
+  }
+  @Override
+  public boolean getMajorCompaction() {
+    return autoMajorCompact;
+  }
+
+  @Override
+  public HDFSStoreFactory setMajorCompactionInterval(int count) {
+    HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_INTERVAL, count);
+    this.majorCompactionIntervalMins = count;
+    return this;
+  }
+  @Override
+  public int getMajorCompactionInterval() {
+    return majorCompactionIntervalMins;
+  }
+
+  @Override
+  public HDFSStoreFactory setMajorCompactionThreads(int count) {
+    HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_THREADS, count);
+    this.majorCompactionConcurrency = count;
+    return this;
+  }
+  @Override
+  public int getMajorCompactionThreads() {
+    return majorCompactionConcurrency;
+  }
+  
+  @Override
+  public HDFSStoreFactory setInputFileSizeMax(int size) {
+    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_SIZE_MB", size);
+    this.maxInputFileSizeMB = size;
+    return this;
+  }
+  @Override
+  public int getInputFileSizeMax() {
+    return maxInputFileSizeMB;
+  }
+
+  @Override
+  public HDFSStoreFactory setInputFileCountMin(int count) {
+    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MIN_INPUT_FILE_COUNT", count);
+    this.minInputFileCount = count;
+    return this;
+  }
+  @Override
+  public int getInputFileCountMin() {
+    return minInputFileCount;
+  }
+
+  @Override
+  public HDFSStoreFactory setInputFileCountMax(int count) {
+    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_COUNT", count);
+    this.maxInputFileCount = count;
+    return this;
+  }
+  @Override
+  public int getInputFileCountMax() {
+    return maxInputFileCount;
+  }
+
+  @Override
+  public int getPurgeInterval() {
+    return oldFileCleanupIntervalMins ;
+  }    
+  @Override
+  public HDFSStoreFactory setPurgeInterval(int interval) {
+    assertIsPositive(CacheXml.HDFS_PURGE_INTERVAL, interval);
+    this.oldFileCleanupIntervalMins = interval;
+    return this;
+  }
+  
+  protected void validate() {
+    if (minInputFileCount > maxInputFileCount) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.HOPLOG_MIN_IS_MORE_THAN_MAX
+          .toLocalizedString(new Object[] {
+              "HDFS_COMPACTION_MIN_INPUT_FILE_COUNT",
+              minInputFileCount,
+              "HDFS_COMPACTION_MAX_INPUT_FILE_COUNT",
+              maxInputFileCount }));
+    }
+  }
+
+  /**
+   * This method should not be called on this class.
+   * @see HDFSStoreFactory#create(String)
+   */
+  @Override
+  public HDFSStore create(String name) throws GemFireConfigException,
+      StoreExistsException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * This method should not be called on this class.
+   * @see HDFSStoreImpl#destroy()
+   */
+  @Override
+  public void destroy() {
+    throw new UnsupportedOperationException();
+  }
+  
+  public static void assertIsPositive(String name, int count) {
+    if (count < 1) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
+              .toLocalizedString(new Object[] { name, count }));
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("HDFSStoreConfigHolder@");
+    builder.append(System.identityHashCode(this));
+    builder.append(" [");
+    appendStrProp(builder, name, "name");
+    appendStrProp(builder, namenodeURL, "namenodeURL");
+    appendStrProp(builder, homeDir, "homeDir");
+    appendStrProp(builder, clientConfigFile, "clientConfigFile");
+    if (blockCacheSize > -1) {
+      builder.append("blockCacheSize=");
+      builder.append(blockCacheSize);
+      builder.append(", ");
+    }
+    appendIntProp(builder, maxFileSize, "maxFileSize");
+    appendIntProp(builder, fileRolloverInterval, "fileRolloverInterval");
+    appendBoolProp(builder, isAutoCompact, "isAutoCompact");
+    appendBoolProp(builder, autoMajorCompact, "autoMajorCompact");
+    appendIntProp(builder, maxConcurrency, "maxConcurrency");
+    appendIntProp(builder, majorCompactionConcurrency, "majorCompactionConcurrency");
+    appendIntProp(builder, majorCompactionIntervalMins, "majorCompactionIntervalMins");
+    appendIntProp(builder, maxInputFileSizeMB, "maxInputFileSizeMB");
+    appendIntProp(builder, maxInputFileCount, "maxInputFileCount");
+    appendIntProp(builder, minInputFileCount, "minInputFileCount");
+    appendIntProp(builder, oldFileCleanupIntervalMins, "oldFileCleanupIntervalMins");
+    appendIntProp(builder, batchSize, "batchSize");
+    appendIntProp(builder, batchIntervalMillis, "batchInterval");
+    appendIntProp(builder, maximumQueueMemory, "maximumQueueMemory");
+    appendIntProp(builder, dispatcherThreads, "dispatcherThreads");
+    appendBoolProp(builder, isPersistenceEnabled, "isPersistenceEnabled");
+    appendStrProp(builder, diskStoreName, "diskStoreName");
+    appendBoolProp(builder, diskSynchronous, "diskSynchronous");
+
+    builder.append("]");
+    return builder.toString();
+  }
+
+  private void appendStrProp(StringBuilder builder, String value, String name) {
+    if (value != null) {
+      builder.append(name + "=");
+      builder.append(value);
+      builder.append(", ");
+    }
+  }
+
+  private void appendIntProp(StringBuilder builder, int value, String name) {
+    if (value > -1) {
+      builder.append(name + "=");
+      builder.append(value);
+      builder.append(", ");
+    }
+  }
+  
+  private void appendBoolProp(StringBuilder builder, boolean value, String name) {
+    builder.append(name + "=");
+    builder.append(value);
+    builder.append(", ");
+  }
+
+  @Override
+  public HDFSStoreMutator createHdfsStoreMutator() {
+    // as part of alter execution, hdfs store will replace the config holder
+    // completely. Hence mutator at the config holder is not needed
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public HDFSStore alter(HDFSStoreMutator mutator) {
+    // as part of alter execution, hdfs store will replace the config holder
+    // completely. Hence mutator at the config holder is not needed
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getDiskStoreName() {
+    return this.diskStoreName;
+  }
+  @Override
+  public HDFSStoreFactory setDiskStoreName(String name) {
+    this.diskStoreName = name;
+    return this;
+  }
+
+  @Override
+  public int getBatchInterval() {
+    return this.batchIntervalMillis;
+  }
+  @Override
+  public HDFSStoreFactory setBatchInterval(int intervalMillis){
+    this.batchIntervalMillis = intervalMillis;
+    return this;
+  }
+  
+  @Override
+  public boolean getBufferPersistent() {
+    return isPersistenceEnabled;
+  }
+  @Override
+  public HDFSStoreFactory setBufferPersistent(boolean isPersistent) {
+    this.isPersistenceEnabled = isPersistent;
+    return this;
+  }
+
+  @Override
+  public int getDispatcherThreads() {
+    return dispatcherThreads;
+  }
+  @Override
+  public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) {
+    this.dispatcherThreads = dispatcherThreads;
+    return this;
+  }
+  
+  @Override
+  public int getMaxMemory() {
+    return this.maximumQueueMemory;
+  }
+  @Override
+  public HDFSStoreFactory setMaxMemory(int memory) {
+    this.maximumQueueMemory = memory;
+    return this;
+  }
+  
+  @Override
+  public int getBatchSize() {
+    return this.batchSize;
+  }
+  @Override
+  public HDFSStoreFactory setBatchSize(int size){
+    this.batchSize = size;
+    return this;
+  }
+  
+  @Override
+  public boolean getSynchronousDiskWrite() {
+    return this.diskSynchronous;
+  }
+  @Override
+  public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) {
+    this.diskSynchronous = isSynchronous;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
new file mode 100644
index 0000000..9ecc5e3
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ */
+public class HDFSStoreCreation implements HDFSStoreFactory {
+  protected HDFSStoreConfigHolder configHolder;
+  
+  public HDFSStoreCreation() {
+    this(null);
+  }
+
+  /**
+   * Copy constructor for HDFSStoreCreation
+   * @param config configuration source for creating this instance 
+   */
+  public HDFSStoreCreation(HDFSStoreCreation config) {
+    this.configHolder = new HDFSStoreConfigHolder(config == null ? null : config.configHolder);
+  }
+
+  @Override
+  public HDFSStoreFactory setName(String name) {
+    configHolder.setName(name);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setNameNodeURL(String namenodeURL) {
+    configHolder.setNameNodeURL(namenodeURL);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setHomeDir(String homeDir) {
+    configHolder.setHomeDir(homeDir);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) {
+    configHolder.setHDFSClientConfigFile(clientConfigFile);
+    return this;
+  }
+  
+  @Override
+  public HDFSStoreFactory setBlockCacheSize(float percentage) {
+    configHolder.setBlockCacheSize(percentage);
+    return this;
+  }
+  
+  @Override
+  public HDFSStoreFactory setWriteOnlyFileRolloverSize(int maxFileSize) {
+    configHolder.setWriteOnlyFileRolloverSize(maxFileSize);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) {
+    configHolder.setWriteOnlyFileRolloverInterval(count);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setMinorCompaction(boolean auto) {
+    configHolder.setMinorCompaction(auto);
+    return this;
+  }
+  
+  @Override
+  public HDFSStoreFactory setMinorCompactionThreads(int count) {
+    configHolder.setMinorCompactionThreads(count);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setMajorCompaction(boolean auto) {
+    configHolder.setMajorCompaction(auto);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setMajorCompactionInterval(int count) {
+    configHolder.setMajorCompactionInterval(count);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setMajorCompactionThreads(int count) {
+    configHolder.setMajorCompactionThreads(count);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setInputFileSizeMax(int size) {
+    configHolder.setInputFileSizeMax(size);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setInputFileCountMin(int count) {
+    configHolder.setInputFileCountMin(count);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setInputFileCountMax(int count) {
+    configHolder.setInputFileCountMax(count);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setPurgeInterval(int interval) {
+    configHolder.setPurgeInterval(interval);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setDiskStoreName(String name) {
+    configHolder.setDiskStoreName(name);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setMaxMemory(int memory) {
+    configHolder.setMaxMemory(memory);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setBatchInterval(int intervalMillis) {
+    configHolder.setBatchInterval(intervalMillis);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setBatchSize(int size) {
+    configHolder.setBatchSize(size);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setBufferPersistent(boolean isPersistent) {
+    configHolder.setBufferPersistent(isPersistent);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) {
+    configHolder.setSynchronousDiskWrite(isSynchronous);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) {
+    configHolder.setDispatcherThreads(dispatcherThreads);
+    return this;
+  }
+  
+  /**
+   * This method should not be called on this class.
+   * @see HDFSStoreFactory#create(String)
+   */
+  @Override
+  public HDFSStore create(String name) throws GemFireConfigException,
+      StoreExistsException {
+    throw new UnsupportedOperationException();
+  }
+
+  public static void assertIsPositive(String name, int count) {
+    if (count < 1) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
+              .toLocalizedString(new Object[] { name, count }));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
new file mode 100644
index 0000000..749f01c
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+
+/**
+ * Implementation of HDFSStoreFactory 
+ * 
+ */
+public class HDFSStoreFactoryImpl extends HDFSStoreCreation {
+  public static final String DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS= "HDFS_QUEUE";
+  
+  private Cache cache;
+  
+  public HDFSStoreFactoryImpl(Cache cache) {
+    this(cache, null);
+  }
+  
+  public HDFSStoreFactoryImpl(Cache cache, HDFSStoreCreation config) {
+    super(config);
+    this.cache = cache;
+  }
+
+  @Override
+  public HDFSStore create(String name) {
+    if (name == null) {
+      throw new GemFireConfigException("HDFS store name not provided");
+    }
+    
+    this.configHolder.validate();
+    
+    HDFSStore result = null;
+    synchronized (this) {
+      if (this.cache instanceof GemFireCacheImpl) {
+        GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache;
+        if (gfc.findHDFSStore(name) != null) {
+          throw new StoreExistsException(name);
+        }
+        
+        HDFSStoreImpl hsi = new HDFSStoreImpl(name, this.configHolder);
+        gfc.addHDFSStore(hsi);
+        result = hsi;
+      }
+    }
+    return result;
+  }
+
+  public static final String getEventQueueName(String regionPath) {
+    return HDFSStoreFactoryImpl.DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS + "_"
+        + regionPath.replace('/', '_');
+  }
+
+  public HDFSStore getConfigView() {
+    return (HDFSStore) configHolder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
new file mode 100644
index 0000000..b5d56b6
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.ConnectTimeoutException;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.util.SingletonCallable;
+import com.gemstone.gemfire.internal.util.SingletonValue;
+import com.gemstone.gemfire.internal.util.SingletonValue.SingletonBuilder;
+
+/**
+ * Represents a HDFS based persistent store for region data.
+ * 
+ */
+public class HDFSStoreImpl implements HDFSStore {
+  
+  private volatile HDFSStoreConfigHolder configHolder; 
+  
+  private final SingletonValue<FileSystem> fs;
+
+  /**
+   * Used to make sure that only one thread creates the writer at a time. This prevents the dispatcher
+   * threads from cascading the Connection lock in DFS client see bug 51195
+   */
+  private final SingletonCallable<HoplogWriter> singletonWriter = new SingletonCallable<HoplogWriter>();
+
+  private final HFileStoreStatistics stats;
+  private final BlockCache blockCache;
+
+  private static HashSet<String> secureNameNodes = new HashSet<String>();
+  
+  private final boolean PERFORM_SECURE_HDFS_CHECK = Boolean.getBoolean(HoplogConfig.PERFORM_SECURE_HDFS_CHECK_PROP);
+  private static final Logger logger = LogService.getLogger();
+  protected final String logPrefix;
+  
+  static {
+    HdfsConfiguration.init();
+  }
+  
+  public HDFSStoreImpl(String name, final HDFSStore config) {
+    this.configHolder = new HDFSStoreConfigHolder(config);
+    configHolder.setName(name);
+
+    this.logPrefix = "<" + "HdfsStore:" + name + "> ";
+
+    stats = new HFileStoreStatistics(InternalDistributedSystem.getAnyInstance(), "HDFSStoreStatistics", name);
+
+    final Configuration hconf = new Configuration();
+        
+    // Set the block cache size.
+    // Disable the static block cache. We keep our own cache on the HDFS Store
+    // hconf.setFloat("hfile.block.cache.size", 0f);
+    if (this.getBlockCacheSize() != 0) {
+      long cacheSize = (long) (HeapMemoryMonitor.getTenuredPoolMaxMemory() * this.getBlockCacheSize() / 100);
+
+      // TODO use an off heap block cache if we're using off heap memory?
+      // See CacheConfig.instantiateBlockCache.
+      // According to Anthony, the off heap block cache is still
+      // experimental. Our own off heap cache might be a better bet.
+//      this.blockCache = new LruBlockCache(cacheSize,
+//          StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf, HFileSortedOplogFactory.convertStatistics(stats));
+      this.blockCache = new LruBlockCache(cacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf);
+    } else {
+      this.blockCache = null;
+    }
+    
+    final String clientFile = config.getHDFSClientConfigFile();
+    fs = new SingletonValue<FileSystem>(new SingletonBuilder<FileSystem>() {
+      @Override
+      public FileSystem create() throws IOException {
+        return createFileSystem(hconf, clientFile, false);
+      }
+
+      @Override
+      public void postCreate() {
+      }
+      
+      @Override
+      public void createInProgress() {
+      }
+    });
+    
+    FileSystem fileSystem = null;
+    try {
+      fileSystem = fs.get();
+    } catch (Throwable ex) {
+      throw new HDFSIOException(ex.getMessage(),ex);
+    }    
+    //HDFSCompactionConfig has already been initialized
+    long cleanUpIntervalMillis = getPurgeInterval() * 60 * 1000;
+    Path cleanUpIntervalPath = new Path(getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
+    HoplogUtil.exposeCleanupIntervalMillis(fileSystem, cleanUpIntervalPath, cleanUpIntervalMillis);
+  }
+  
+  /**
+   * Creates a new file system every time.  
+   */
+  public FileSystem createFileSystem() {
+    Configuration hconf = new Configuration();
+    try {
+      return createFileSystem(hconf, this.getHDFSClientConfigFile(), true);
+    } catch (Throwable ex) {
+      throw new HDFSIOException(ex.getMessage(),ex);
+    }
+  }
+  
+  private FileSystem createFileSystem(Configuration hconf, String configFile, boolean forceNew) throws IOException {
+    FileSystem filesystem = null; 
+    
+      // load hdfs client config file if specified. The path is on local file
+      // system
+      if (configFile != null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}Adding resource config file to hdfs configuration:" + configFile, logPrefix);
+        }
+        hconf.addResource(new Path(configFile));
+        
+        if (! new File(configFile).exists()) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_CLIENT_CONFIG_FILE_ABSENT, configFile));
+        }
+      }
+      
+      // This setting disables shutdown hook for file system object. Shutdown
+      // hook may cause FS object to close before the cache or store and
+      // unpredictable behavior. This setting is provided for GFXD like server
+      // use cases where FS close is managed by a server. This setting is not
+      // supported by old versions of hadoop, HADOOP-4829
+      hconf.setBoolean("fs.automatic.close", false);
+      
+      // Hadoop has a configuration parameter io.serializations that is a list of serialization 
+      // classes which can be used for obtaining serializers and deserializers. This parameter 
+      // by default contains avro classes. When a sequence file is created, it calls 
+      // SerializationFactory.getSerializer(keyclass). This internally creates objects using 
+      // reflection of all the classes that were part of io.serializations. But since, there is 
+      // no avro class available it throws an exception. 
+      // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes 
+      // that are important to us. 
+      hconf.setStrings("io.serializations",
+          new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"});
+      // create writer
+
+      SchemaMetrics.configureGlobally(hconf);
+      
+      String nameNodeURL = null;
+      if ((nameNodeURL = getNameNodeURL()) == null) {
+          nameNodeURL = hconf.get("fs.default.name");
+      }
+      
+      URI namenodeURI = URI.create(nameNodeURL);
+    
+    //if (! GemFireCacheImpl.getExisting().isHadoopGfxdLonerMode()) {
+      String authType = hconf.get("hadoop.security.authentication");
+      
+      //The following code handles Gemfire XD with secure HDFS
+      //A static set is used to cache all known secure HDFS NameNode urls.
+      UserGroupInformation.setConfiguration(hconf);
+
+      //Compare authentication method ignoring case to make GFXD future version complaint
+      //At least version 2.0.2 starts complaining if the string "kerberos" is not in all small case.
+      //However it seems current version of hadoop accept the authType in any case
+      if (authType.equalsIgnoreCase("kerberos")) {
+        
+        String principal = hconf.get(HoplogConfig.KERBEROS_PRINCIPAL);
+        String keyTab = hconf.get(HoplogConfig.KERBEROS_KEYTAB_FILE);
+       
+        if (!PERFORM_SECURE_HDFS_CHECK) {
+          if (logger.isDebugEnabled())
+            logger.debug("{}Ignore secure hdfs check", logPrefix);
+        } else {
+          if (!secureNameNodes.contains(nameNodeURL)) {
+            if (logger.isDebugEnabled())
+              logger.debug("{}Executing secure hdfs check", logPrefix);
+             try{
+              filesystem = FileSystem.newInstance(namenodeURI, hconf);
+              //Make sure no IOExceptions are generated when accessing insecure HDFS. 
+              filesystem.listFiles(new Path("/"),false);
+              throw new HDFSIOException("Gemfire XD HDFS client and HDFS cluster security levels do not match. The configured HDFS Namenode is not secured.");
+             } catch (IOException ex) {
+               secureNameNodes.add(nameNodeURL);
+             } finally {
+             //Close filesystem to avoid resource leak
+               if(filesystem != null) {
+                 closeFileSystemIgnoreError(filesystem);
+               }
+             }
+          }
+        }
+
+        // check to ensure the namenode principal is defined
+        String nameNodePrincipal = hconf.get("dfs.namenode.kerberos.principal");
+        if (nameNodePrincipal == null) {
+          throw new IOException(LocalizedStrings.GF_KERBEROS_NAMENODE_PRINCIPAL_UNDEF.toLocalizedString());
+        }
+        
+        // ok, the user specified a gfxd principal so we will try to login
+        if (principal != null) {
+          //If NameNode principal is the same as Gemfire XD principal, there is a 
+          //potential security hole
+          String regex = "[/@]";
+          if (nameNodePrincipal != null) {
+            String HDFSUser = nameNodePrincipal.split(regex)[0];
+            String GFXDUser = principal.split(regex)[0];
+            if (HDFSUser.equals(GFXDUser)) {
+              logger.warn(LocalizedMessage.create(LocalizedStrings.HDFS_USER_IS_SAME_AS_GF_USER, GFXDUser));
+            }
+          }
+          
+          // a keytab must exist if the user specifies a principal
+          if (keyTab == null) {
+            throw new IOException(LocalizedStrings.GF_KERBEROS_KEYTAB_UNDEF.toLocalizedString());
+          }
+          
+          // the keytab must exist as well
+          File f = new File(keyTab);
+          if (!f.exists()) {
+            throw new FileNotFoundException(LocalizedStrings.GF_KERBEROS_KEYTAB_FILE_ABSENT.toLocalizedString(f.getAbsolutePath()));
+          }
+
+          //Authenticate Gemfire XD principal to Kerberos KDC using Gemfire XD keytab file
+          String principalWithValidHost = SecurityUtil.getServerPrincipal(principal, "");
+          UserGroupInformation.loginUserFromKeytab(principalWithValidHost, keyTab);
+        } else {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.GF_KERBEROS_PRINCIPAL_UNDEF));
+        }
+      }
+    //}
+
+    filesystem = getFileSystemFactory().create(namenodeURI, hconf, forceNew);
+    
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Initialized FileSystem linked to " + filesystem.getUri()
+          + " " + filesystem.hashCode(), logPrefix);
+    }
+    return filesystem;
+  }
+
+  public FileSystem getFileSystem() throws IOException {
+    return fs.get();
+  }
+  
+  public FileSystem getCachedFileSystem() {
+    return fs.getCachedValue();
+  }
+
+  public SingletonCallable<HoplogWriter> getSingletonWriter() {
+    return this.singletonWriter;
+  }
+
+  private final SingletonCallable<Boolean> fsExists = new SingletonCallable<Boolean>();
+
+  public boolean checkFileSystemExists() throws IOException {
+    try {
+      return fsExists.runSerially(new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          FileSystem fileSystem = getCachedFileSystem();
+          if (fileSystem == null) {
+            return false;
+          }
+          return fileSystem.exists(new Path("/"));
+        }
+      });
+    } catch (Exception e) {
+      if (e instanceof IOException) {
+        throw (IOException)e;
+      }
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * This method executes a query on namenode. If the query succeeds, FS
+   * instance is healthy. If it fails, the old instance is closed and a new
+   * instance is created.
+   */
+  public void checkAndClearFileSystem() {
+    FileSystem fileSystem = getCachedFileSystem();
+    
+    if (fileSystem != null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Checking file system at " + fileSystem.getUri(), logPrefix);
+      }
+      try {
+        checkFileSystemExists();
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}FS client is ok: " + fileSystem.getUri() + " "
+              + fileSystem.hashCode(), logPrefix);
+        }
+        return;
+      } catch (ConnectTimeoutException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}Hdfs unreachable, FS client is ok: "
+              + fileSystem.getUri() + " " + fileSystem.hashCode(), logPrefix);
+        }
+        return;
+      } catch (IOException e) {
+        logger.debug("IOError in filesystem checkAndClear ", e);
+        
+        // The file system is closed or NN is not reachable. It is safest to
+        // create a new FS instance. If the NN continues to remain unavailable,
+        // all subsequent read/write request will cause HDFSIOException. This is
+        // similar to the way hbase manages failures. This has a drawback
+        // though. A network blip will result in all connections to be
+        // recreated. However trying to preserve the connections and waiting for
+        // FS to auto-recover is not deterministic.
+        if (e instanceof RemoteException) {
+          e = ((RemoteException) e).unwrapRemoteException();
+        }
+
+        logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_UNREACHABLE,
+            fileSystem.getUri()), e);
+      }
+
+      // compare and clear FS container. The fs container needs to be reusable
+      boolean result = fs.clear(fileSystem, true);
+      if (!result) {
+        // the FS instance changed after this call was initiated. Check again
+        logger.debug("{}Failed to clear FS ! I am inconsistent so retrying ..", logPrefix);
+        checkAndClearFileSystem();
+      } else {
+        closeFileSystemIgnoreError(fileSystem);
+      }      
+    }
+  }
+
+  private void closeFileSystemIgnoreError(FileSystem fileSystem) {
+    if (fileSystem == null) {
+      logger.debug("{}Trying to close null file system", logPrefix);
+      return;
+    }
+
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Closing file system at " + fileSystem.getUri() + " "
+            + fileSystem.hashCode(), logPrefix);
+      }
+      fileSystem.close();
+    } catch (Exception e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Failed to close file system at " + fileSystem.getUri()
+            + " " + fileSystem.hashCode(), e);
+      }
+    }
+  }
+
+  public HFileStoreStatistics getStats() {
+    return stats;
+  }
+  
+  public BlockCache getBlockCache() {
+    return blockCache;
+  }
+
+  public void close() {
+    logger.debug("{}Closing file system: " + getName(), logPrefix);
+    stats.close();
+    blockCache.shutdown();
+    //Might want to clear the block cache, but it should be dereferenced.
+    
+    // release DDL hoplog organizer for this store. Also shutdown compaction
+    // threads. These two resources hold references to GemfireCacheImpl
+    // instance. Any error is releasing this resources is not critical and needs
+    // be ignored.
+    try {
+      HDFSCompactionManager manager = HDFSCompactionManager.getInstance(this);
+      if (manager != null) {
+        manager.reset();
+      }
+    } catch (Exception e) {
+      logger.info(e);
+    }
+    
+    // once this store is closed, this store should not be used again
+    FileSystem fileSystem = fs.clear(false);
+    if (fileSystem != null) {
+      closeFileSystemIgnoreError(fileSystem);
+    }    
+  }
+  
+  /**
+   * Test hook to remove all of the contents of the the folder
+   * for this HDFS store from HDFS.
+   * @throws IOException 
+   */
+  public void clearFolder() throws IOException {
+    getFileSystem().delete(new Path(getHomeDir()), true);
+  }
+  
+  @Override
+  public void destroy() {
+    Collection<String> regions = HDFSRegionDirector.getInstance().getRegionsInStore(this);
+    if(!regions.isEmpty()) {
+      throw new IllegalStateException("Cannot destroy a HDFS store that still contains regions: " + regions); 
+    }
+    close();
+    HDFSStoreDirector.getInstance().removeHDFSStore(this.getName());
+  }
+
+  @Override
+  public synchronized HDFSStore alter(HDFSStoreMutator mutator) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Altering hdfsStore " + this, logPrefix);
+      logger.debug("{}Mutator " + mutator, logPrefix);
+    }
+    HDFSStoreConfigHolder newHolder = new HDFSStoreConfigHolder(configHolder);
+    newHolder.copyFrom(mutator);
+    newHolder.validate();
+    HDFSStore oldStore = configHolder;
+    configHolder = newHolder;
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Resuult of Alter " + this, logPrefix);
+    }
+    return (HDFSStore) oldStore;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("HDFSStoreImpl [");
+    if (configHolder != null) {
+      builder.append("configHolder=");
+      builder.append(configHolder);
+    }
+    builder.append("]");
+    return builder.toString();
+  }
+
+  @Override
+  public String getName() {
+    return configHolder.getName();
+  }
+
+  @Override
+  public String getNameNodeURL() {
+    return configHolder.getNameNodeURL();
+  }
+
+  @Override
+  public String getHomeDir() {
+    return configHolder.getHomeDir();
+  }
+
+  @Override
+  public String getHDFSClientConfigFile() {
+    return configHolder.getHDFSClientConfigFile();
+  }
+
+  @Override
+  public float getBlockCacheSize() {
+    return configHolder.getBlockCacheSize();
+  }
+
+  @Override
+  public int getWriteOnlyFileRolloverSize() {
+    return configHolder.getWriteOnlyFileRolloverSize();
+  }
+
+  @Override
+  public int getWriteOnlyFileRolloverInterval() {
+    return configHolder.getWriteOnlyFileRolloverInterval();
+  }
+
+  @Override
+  public boolean getMinorCompaction() {
+    return configHolder.getMinorCompaction();
+  }
+
+  @Override
+  public int getMinorCompactionThreads() {
+    return configHolder.getMinorCompactionThreads();
+  }
+
+  @Override
+  public boolean getMajorCompaction() {
+    return configHolder.getMajorCompaction();
+  }
+
+  @Override
+  public int getMajorCompactionInterval() {
+    return configHolder.getMajorCompactionInterval();
+  }
+
+  @Override
+  public int getMajorCompactionThreads() {
+    return configHolder.getMajorCompactionThreads();
+  }
+
+
+  @Override
+  public int getInputFileSizeMax() {
+    return configHolder.getInputFileSizeMax();
+  }
+
+  @Override
+  public int getInputFileCountMin() {
+    return configHolder.getInputFileCountMin();
+  }
+
+  @Override
+  public int getInputFileCountMax() {
+    return configHolder.getInputFileCountMax();
+  }
+
+  @Override
+  public int getPurgeInterval() {
+    return configHolder.getPurgeInterval();
+  }
+
+  @Override
+  public String getDiskStoreName() {
+    return configHolder.getDiskStoreName();
+  }
+
+  @Override
+  public int getMaxMemory() {
+    return configHolder.getMaxMemory();
+  }
+
+  @Override
+  public int getBatchSize() {
+    return configHolder.getBatchSize();
+  }
+
+  @Override
+  public int getBatchInterval() {
+    return configHolder.getBatchInterval();
+  }
+
+  @Override
+  public boolean getBufferPersistent() {
+    return configHolder.getBufferPersistent();
+  }
+
+  @Override
+  public boolean getSynchronousDiskWrite() {
+    return configHolder.getSynchronousDiskWrite();
+  }
+
+  @Override
+  public int getDispatcherThreads() {
+    return configHolder.getDispatcherThreads();
+  }
+  
+  @Override
+  public HDFSStoreMutator createHdfsStoreMutator() {
+    return new HDFSStoreMutatorImpl();
+  }
+
+  public FileSystemFactory getFileSystemFactory() {
+    return new DistributedFileSystemFactory();
+  }
+
+  /*
+   * Factory to create HDFS file system instances
+   */
+  static public interface FileSystemFactory {
+    public FileSystem create(URI namenode, Configuration conf, boolean forceNew) throws IOException;
+  }
+
+  /*
+   * File system factory implementations for creating instances of file system
+   * connected to distributed HDFS cluster
+   */
+  public class DistributedFileSystemFactory implements FileSystemFactory {
+    private final boolean ALLOW_TEST_FILE_SYSTEM = Boolean.getBoolean(HoplogConfig.ALLOW_LOCAL_HDFS_PROP);
+    private final boolean USE_FS_CACHE = Boolean.getBoolean(HoplogConfig.USE_FS_CACHE);
+
+    @Override
+    public FileSystem create(URI nn, Configuration conf, boolean create) throws IOException {
+      FileSystem filesystem;
+
+      if (USE_FS_CACHE && !create) {
+        filesystem = FileSystem.get(nn, conf);
+      } else {
+        filesystem = FileSystem.newInstance(nn, conf);
+      }
+
+      if (filesystem instanceof LocalFileSystem && !ALLOW_TEST_FILE_SYSTEM) {
+        closeFileSystemIgnoreError(filesystem);
+        throw new IllegalStateException(
+            LocalizedStrings.HOPLOG_TRYING_TO_CREATE_STANDALONE_SYSTEM.toLocalizedString(getNameNodeURL()));
+      }
+
+      return filesystem;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
new file mode 100644
index 0000000..203e623
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+public class HDFSStoreMutatorImpl implements HDFSStoreMutator {
+  private HDFSStoreConfigHolder configHolder;
+  private Boolean autoCompact;
+  private Boolean autoMajorCompact;
+
+  public HDFSStoreMutatorImpl() {
+    configHolder = new HDFSStoreConfigHolder();
+    configHolder.resetDefaultValues();
+  }
+
+  public HDFSStoreMutatorImpl(HDFSStore store) {
+    configHolder = new HDFSStoreConfigHolder(store);
+  }
+  
+  public HDFSStoreMutator setWriteOnlyFileRolloverSize(int maxFileSize) {
+    configHolder.setWriteOnlyFileRolloverSize(maxFileSize);
+    return this;
+  }
+  @Override
+  public int getWriteOnlyFileRolloverSize() {
+    return configHolder.getWriteOnlyFileRolloverSize();
+  }
+
+  @Override
+  public HDFSStoreMutator setWriteOnlyFileRolloverInterval(int count) {
+    configHolder.setWriteOnlyFileRolloverInterval(count);
+    return this;
+  }
+  @Override
+  public int getWriteOnlyFileRolloverInterval() {
+    return configHolder.getWriteOnlyFileRolloverInterval();
+  }
+
+  @Override
+  public HDFSStoreMutator setMinorCompaction(boolean auto) {
+    autoCompact = Boolean.valueOf(auto);
+    configHolder.setMinorCompaction(auto);
+    return null;
+  }
+  @Override
+  public Boolean getMinorCompaction() {
+    return autoCompact;
+  }
+  
+  @Override
+  public HDFSStoreMutator setMinorCompactionThreads(int count) {
+    configHolder.setMinorCompactionThreads(count);
+    return this;
+  }
+  @Override
+  public int getMinorCompactionThreads() {
+    return configHolder.getMinorCompactionThreads();
+  }
+  
+  @Override
+  public HDFSStoreMutator setMajorCompaction(boolean auto) {
+    autoMajorCompact = Boolean.valueOf(auto);
+    configHolder.setMajorCompaction(auto);
+    return this;
+  }
+  @Override
+  public Boolean getMajorCompaction() {
+    return autoMajorCompact;
+  }
+
+  @Override
+  public HDFSStoreMutator setMajorCompactionInterval(int count) {
+    configHolder.setMajorCompactionInterval(count);
+    return this;
+  }
+  @Override
+  public int getMajorCompactionInterval() {
+    return configHolder.getMajorCompactionInterval();
+  }
+
+  @Override
+  public HDFSStoreMutator setMajorCompactionThreads(int count) {
+    configHolder.setMajorCompactionThreads(count);
+    return this;
+  }
+  @Override
+  public int getMajorCompactionThreads() {
+    return configHolder.getMajorCompactionThreads();
+  }
+
+  @Override
+  public HDFSStoreMutator setInputFileSizeMax(int size) {
+    configHolder.setInputFileSizeMax(size);
+    return this;
+  }
+  @Override
+  public int getInputFileSizeMax() {
+    return configHolder.getInputFileSizeMax();
+  }
+  
+  @Override
+  public HDFSStoreMutator setInputFileCountMin(int count) {
+    configHolder.setInputFileCountMin(count);
+    return this;
+  }
+  @Override
+  public int getInputFileCountMin() {
+    return configHolder.getInputFileCountMin();
+  }
+  
+  @Override
+  public HDFSStoreMutator setInputFileCountMax(int count) {
+    configHolder.setInputFileCountMax(count);
+    return this;
+  }
+  @Override
+  public int getInputFileCountMax() {
+    return configHolder.getInputFileCountMax();
+  }
+  
+  @Override
+  public HDFSStoreMutator setPurgeInterval(int interval) {
+    configHolder.setPurgeInterval(interval);
+    return this;
+  }
+  @Override
+  public int getPurgeInterval() {
+    return configHolder.getPurgeInterval();
+  }
+
+  @Override
+  public int getBatchSize() {
+    return configHolder.batchSize;
+  }
+  @Override
+  public HDFSStoreMutator setBatchSize(int size) {
+    configHolder.setBatchSize(size);
+    return this;
+  }
+
+  
+  @Override
+  public int getBatchInterval() {
+    return configHolder.batchIntervalMillis;
+  }
+  @Override
+  public HDFSStoreMutator setBatchInterval(int interval) {
+    configHolder.setBatchInterval(interval);
+    return this;
+  }
+    
+  public static void assertIsPositive(String name, int count) {
+    if (count < 1) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
+              .toLocalizedString(new Object[] { name, count }));
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("HDFSStoreMutatorImpl [");
+    if (configHolder != null) {
+      builder.append("configHolder=");
+      builder.append(configHolder);
+      builder.append(", ");
+    }
+    if (autoCompact != null) {
+      builder.append("MinorCompaction=");
+      builder.append(autoCompact);
+      builder.append(", ");
+    }
+    if (getMajorCompaction() != null) {
+      builder.append("autoMajorCompaction=");
+      builder.append(getMajorCompaction());
+      builder.append(", ");
+    }
+    builder.append("]");
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
new file mode 100644
index 0000000..0298523
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * Listener that persists events to a write only HDFS store
+ *
+ */
+public class HDFSWriteOnlyStoreEventListener implements
+    AsyncEventListener {
+
+  private final LogWriterI18n logger;
+  private volatile boolean senderStopped = false; 
+  private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f);
+  
+  
+  public HDFSWriteOnlyStoreEventListener(LogWriterI18n logger) {
+    this.logger = logger;
+  }
+  
+  @Override
+  public void close() {
+    senderStopped = true;
+  }
+
+  @Override
+  public boolean processEvents(List<AsyncEvent> events) {
+    if (Hoplog.NOP_WRITE) {
+      return true;
+    }
+
+    if (logger.fineEnabled())
+      logger.fine("HDFSWriteOnlyStoreEventListener: A total of " + events.size() + " events are sent from GemFire to persist on HDFS");
+    boolean success = false;
+    try {
+      failureTracker.sleepIfRetry();
+      HDFSGatewayEventImpl hdfsEvent = null;
+      int previousBucketId = -1;
+      BatchManager bm = null;
+      for (AsyncEvent asyncEvent : events) {
+        if (senderStopped){
+          if (logger.fineEnabled()) {
+            logger.fine("HDFSWriteOnlyStoreEventListener.processEvents: Cache is closing down. Ignoring the batch of data.");
+          }
+          return false;
+        }
+        hdfsEvent = (HDFSGatewayEventImpl)asyncEvent;
+        if (previousBucketId != hdfsEvent.getBucketId()){
+          if (previousBucketId != -1) 
+            persistBatch(bm, previousBucketId);
+          
+          previousBucketId = hdfsEvent.getBucketId();
+          bm = new BatchManager();
+        }
+        bm.addEvent(hdfsEvent);
+      }
+      try {
+        persistBatch(bm, hdfsEvent.getBucketId());
+      } catch (BucketMovedException e) {
+        logger.fine("Batch could not be written to HDFS as the bucket moved. bucket id: " + 
+            hdfsEvent.getBucketId() + " Exception: " + e);
+        return false;
+      }
+      success = true;
+    } catch (IOException e) {
+      logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
+      return false;
+    }
+    catch (ClassNotFoundException e) {
+      logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
+      return false;
+    }
+    catch (CacheClosedException e) {
+      // exit silently
+      if (logger.fineEnabled())
+        logger.fine(e);
+      return false;
+    } catch (ForceReattemptException e) {
+      if (logger.fineEnabled())
+        logger.fine(e);
+      return false;
+    } catch (InterruptedException e1) {
+      // TODO Auto-generated catch block
+      e1.printStackTrace();
+    } finally {
+      failureTracker.record(success);
+    }
+    return true;
+  }
+  
+  /**
+   * Persists batches of multiple regions specified by the batch manager
+   * 
+   */
+  private void persistBatch(BatchManager bm, int bucketId) throws IOException, ForceReattemptException {
+    Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> eventsPerRegion = 
+        bm.iterator();
+    HoplogOrganizer bucketOrganizer = null; 
+    while (eventsPerRegion.hasNext()) {
+      Map.Entry<LocalRegion, ArrayList<QueuedPersistentEvent>> eventsForARegion = eventsPerRegion.next();
+      bucketOrganizer = getOrganizer((PartitionedRegion) eventsForARegion.getKey(), bucketId);
+      // bucket organizer cannot be null. 
+      if (bucketOrganizer == null)
+        throw new BucketMovedException("Bucket moved. BucketID: " + bucketId + "  HdfsRegion: " +  eventsForARegion.getKey().getName());
+      bucketOrganizer.flush(eventsForARegion.getValue().iterator(), eventsForARegion.getValue().size());
+      if (logger.fineEnabled()) {
+        logger.fine("Batch written to HDFS of size " +  eventsForARegion.getValue().size() + 
+            " for region " + eventsForARegion.getKey());
+      }
+    }
+  }
+
+  private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) {
+    BucketRegion br = region.getDataStore().getLocalBucketById(bucketId);
+    if (br == null) {
+      // got rebalanced or something
+      throw new BucketMovedException("Bucket region is no longer available. BucketId: "+
+          bucketId + " HdfsRegion: " +  region.getName());
+    }
+
+    return br.getHoplogOrganizer();
+  }
+  
+  /**
+   * Sorts out events of the multiple regions into lists per region 
+   *
+   */
+  private class BatchManager implements Iterable<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> {
+    private HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>> regionBatches = 
+        new HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>>();
+    
+    public void addEvent (HDFSGatewayEventImpl hdfsEvent) throws IOException, ClassNotFoundException {
+      LocalRegion region = (LocalRegion) hdfsEvent.getRegion();
+      ArrayList<QueuedPersistentEvent> regionList = regionBatches.get(region);
+      if (regionList == null) {
+        regionList = new ArrayList<QueuedPersistentEvent>();
+        regionBatches.put(region, regionList);
+      }
+      regionList.add(new UnsortedHDFSQueuePersistedEvent(hdfsEvent));
+    }
+
+    @Override
+    public Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> iterator() {
+      return regionBatches.entrySet().iterator();
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
new file mode 100644
index 0000000..c7ba23f
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogListener;
+
+/**
+ * Objects of this class needs to be created for every region. These objects 
+ * listen to the oplog events and take appropriate action.   
+ *
+ */
+public class HoplogListenerForRegion implements HoplogListener {
+
+  private List<HoplogListener> otherListeners = new CopyOnWriteArrayList<HoplogListener>();
+
+  public HoplogListenerForRegion() {
+    
+  }
+
+  @Override
+  public void hoplogCreated(String regionFolder, int bucketId,
+      Hoplog... oplogs) throws IOException {
+    for (HoplogListener listener : this.otherListeners) {
+      listener.hoplogCreated(regionFolder, bucketId, oplogs);
+    }
+  }
+
+  @Override
+  public void hoplogDeleted(String regionFolder, int bucketId,
+      Hoplog... oplogs) {
+    for (HoplogListener listener : this.otherListeners) {
+      try {
+        listener.hoplogDeleted(regionFolder, bucketId, oplogs);
+      } catch (IOException e) {
+        // TODO handle
+        throw new HDFSIOException(e.getLocalizedMessage(), e);
+      }
+    }
+  }
+
+  public void addListener(HoplogListener listener) {
+    this.otherListeners.add(listener);
+  }
+
+  @Override
+  public void compactionCompleted(String region, int bucket, boolean isMajor) {
+    for (HoplogListener listener : this.otherListeners) {
+      listener.compactionCompleted(region, bucket, isMajor);
+    }
+  }
+}