You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2016/05/03 23:45:28 UTC

[14/60] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS related code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
deleted file mode 100644
index 1e6a034..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.EntryNotFoundException;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.SystemTimer;
-import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
-import com.gemstone.gemfire.internal.cache.ColocationHelper;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
-
-/**
- * Parallel Gateway Sender Queue extended for HDFS functionality 
- *
- */
-public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
-
-  private int currentBucketIndex = 0;
-  private int elementsPeekedAcrossBuckets = 0;
-  private SystemTimer rollListTimer = null;
-  public static final String ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP = "gemfire.ROLL_SORTED_LIST_TIME_INTERVAL_MS";
-  private final int ROLL_SORTED_LIST_TIME_INTERVAL_MS = Integer.getInteger(ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP, 3000);
-  
-  public HDFSParallelGatewaySenderQueue(AbstractGatewaySender sender,
-      Set<Region> userPRs, int idx, int nDispatcher) {
-     
-    super(sender, userPRs, idx, nDispatcher);
-    //only first dispatcher Hemant?
-    if (sender.getBucketSorted() && this.index == 0) {
-      rollListTimer = new SystemTimer(sender.getCache().getDistributedSystem(),
-          true);
-      // schedule the task to roll the skip lists
-      rollListTimer.scheduleAtFixedRate(new RollSortedListsTimerTask(), 
-          ROLL_SORTED_LIST_TIME_INTERVAL_MS, ROLL_SORTED_LIST_TIME_INTERVAL_MS);
-    }
-  }
-  
-  @Override
-  public Object peek() throws InterruptedException, CacheException {
-    /* If you call peek and use super.peek it leads to the following exception.
-     * So I'm adding an explicit UnsupportedOperationException.
-     Caused by: java.lang.ClassCastException: com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue cannot be cast to com.gemstone.gemfire.internal.cache.BucketRegionQueue
-        at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.getRandomPrimaryBucket(ParallelGatewaySenderQueue.java:964)
-        at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.peek(ParallelGatewaySenderQueue.java:1078)
-     */
-    throw new UnsupportedOperationException();
-  }
-  
-  
-  @Override
-  public void cleanUp() {
-    super.cleanUp();
-    cancelRollListTimer();
-  }
-  
-  private void cancelRollListTimer() {
-    if (rollListTimer != null) {
-      rollListTimer.cancel();
-      rollListTimer = null;
-    }
-  }
-  /**
-   * A call to this function peeks elements from the first local primary bucket. 
-   * Next call to this function peeks elements from the next local primary 
-   * bucket and so on.  
-   */
-  @Override
-  public List peek(int batchSize, int timeToWait) throws InterruptedException,
-  CacheException {
-    
-    List batch = new ArrayList();
-    
-    int batchSizeInBytes = batchSize*1024*1024;
-    PartitionedRegion prQ = getRandomShadowPR();
-    if (prQ == null || prQ.getLocalMaxMemory() == 0) {
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      blockProcesorThreadIfRequired();
-      return batch;
-    }
-    
-    ArrayList list = null;
-    ArrayList<Integer> pbuckets = new ArrayList<Integer>(prQ
-        .getDataStore().getAllLocalPrimaryBucketIds());
-    ArrayList<Integer> buckets = new ArrayList<Integer>();
-    for(Integer i : pbuckets) {
-    	if(i % this.nDispatcher == this.index)
-    		buckets.add(i);
-    }
-    // In case of failures, peekedEvents would possibly have some elements 
-    // add them. 
-    if (this.resetLastPeeked) {
-      int previousBucketId = -1;
-      boolean stillPrimary = true; 
-      Iterator<GatewaySenderEventImpl>  iter = peekedEvents.iterator();
-      // we need to remove the events of the bucket that are no more primary on 
-      // this node as they cannot be persisted from this node. 
-      while(iter.hasNext()) {
-        HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)iter.next();
-        if (previousBucketId != hdfsEvent.getBucketId()){
-          stillPrimary = buckets.contains(hdfsEvent.getBucketId());
-          previousBucketId = hdfsEvent.getBucketId();
-        }
-        if (stillPrimary)
-          batch.add(hdfsEvent);
-        else {
-          iter.remove();
-        }
-      }
-      this.resetLastPeeked = false;
-    }
-    
-    if (buckets.size() == 0) {
-      // Sleep a bit before trying again. provided by Dan
-      try {
-        Thread.sleep(50);
-      }
-      catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-      return batch;
-    }
-    
-    if (this.sender.getBucketSorted()) {
-      
-    }
-    
-    // Each call to this function returns index of next bucket 
-    // that is to be processed. This function takes care 
-    // of the bucket sequence that is peeked by a sequence of 
-    // peek calls. 
-    // If there are bucket movements between two consecutive 
-    // calls to this function then there is chance that a bucket 
-    // is processed twice while another one is skipped. But, that is 
-    // ok because in the next round, it will be processed. 
-    Integer bIdIndex = getCurrentBucketIndex(buckets.size());
-    
-    // If we have gone through all the buckets once and no  
-    // elements were peeked from any of the buckets, take a nap.  
-    // This always sleep in the first call but that should be ok  
-    // because the timeToWait in practical use cases would be greater 
-    // than this sleep of 100 ms.  
-    if (bIdIndex == 0 && getAndresetElementsPeekedAcrossBuckets() == 0) { 
-      try { 
-        Thread.sleep(100); 
-      } catch (InterruptedException e) { 
-        Thread.currentThread().interrupt(); 
-      } 
-    } 
-    
-    HDFSBucketRegionQueue hrq = ((HDFSBucketRegionQueue)prQ
-        .getDataStore().getLocalBucketById(buckets.get(bIdIndex)));
-    
-    if (hrq == null) {
-      // bucket moved to another node after getAllLocalPrimaryBucketIds
-      // was called. Peeking not possible. return. 
-      return batch;
-    }
-    long entriesWaitingTobePeeked = hrq.totalEntries();
-    
-    if (entriesWaitingTobePeeked == 0) {
-      blockProcesorThreadIfRequired();
-      return batch;
-    }
-    
-    long currentTimeInMillis = System.currentTimeMillis();
-    long bucketSizeInBytes = hrq.getQueueSizeInBytes();
-    if (((currentTimeInMillis - hrq.getLastPeekTimeInMillis()) >  timeToWait)  
-        || ( bucketSizeInBytes > batchSizeInBytes)
-        || hrq.shouldDrainImmediately()) {
-      // peek now
-      if (logger.isDebugEnabled()) { 
-        logger.debug("Peeking queue " + hrq.getId()   + ": bucketSizeInBytes " + bucketSizeInBytes
-            + ":  batchSizeInBytes" + batchSizeInBytes
-            + ":  timeToWait" + timeToWait
-            + ":  (currentTimeInMillis - hrq.getLastPeekTimeInMillis())" + (currentTimeInMillis - hrq.getLastPeekTimeInMillis()));
-      }
-
-      list = peekAhead(buckets.get(bIdIndex), hrq);
-      
-      if (list != null && list.size() != 0 ) {
-        for (Object object : list) {
-          batch.add(object);
-          peekedEvents.add((HDFSGatewayEventImpl)object);
-        }
-      }
-    }
-    else {
-      blockProcesorThreadIfRequired();
-    }
-    if (logger.isDebugEnabled()  &&  batch.size() > 0) {
-      logger.debug(this + ":  Peeked a batch of " + batch.size() + " entries");
-    }
-    
-    setElementsPeekedAcrossBuckets(batch.size()); 
-    
-    return batch;
-  }
-  
-  /**
-   * This function maintains an index of the last processed bucket.
-   * When it is called, it returns index of the next bucket. 
-   * @param totalBuckets
-   * @return current bucket index
-   */
-  private int getCurrentBucketIndex(int totalBuckets) {
-    int retBucket = currentBucketIndex;
-    if (retBucket >=  totalBuckets) {
-      currentBucketIndex = 0;
-      retBucket = 0;
-    }
-    
-    currentBucketIndex++;
-    
-    return retBucket;
-  }
-  
-  @Override
-  public void remove(int batchSize) throws CacheException {
-    int destroyed = 0;
-    HDFSGatewayEventImpl event = null;
-    
-    if (this.peekedEvents.size() > 0)
-      event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
-    
-    while (event != null && destroyed < batchSize) {
-      Region currentRegion = event.getRegion();
-      int currentBucketId = event.getBucketId();
-      int bucketId = event.getBucketId();
-        
-      ArrayList<HDFSGatewayEventImpl> listToDestroy = new ArrayList<HDFSGatewayEventImpl>();
-      ArrayList<Object> destroyedSeqNum = new ArrayList<Object>();
-      
-      // create a batch of all the entries of a bucket 
-      while (bucketId == currentBucketId) {
-        listToDestroy.add(event);
-        destroyedSeqNum.add(event.getShadowKey());
-        destroyed++;
-
-        if (this.peekedEvents.size() == 0 || (destroyed) >= batchSize) {
-          event = null; 
-          break;
-        }
-
-        event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
-
-        bucketId = event.getBucketId();
-
-        if (!this.sender.isRunning()){
-          if (logger.isDebugEnabled()) {
-            logger.debug("ParallelGatewaySenderQueue#remove: Cache is closing down. Ignoring remove request.");
-          }
-          return;
-        }
-      }
-      try {
-        HDFSBucketRegionQueue brq = getBucketRegionQueue((PartitionedRegion) currentRegion, currentBucketId);
-        
-        if (brq != null) {
-          // destroy the entries from the bucket 
-          brq.destroyKeys(listToDestroy);
-          // Adding the removed event to the map for BatchRemovalMessage
-          // We need to provide the prQ as there could be multiple
-          // queue in a PGS now.
-          PartitionedRegion prQ = brq.getPartitionedRegion();
-          addRemovedEvents(prQ, currentBucketId, destroyedSeqNum);
-        }
-        
-      } catch (ForceReattemptException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("ParallelGatewaySenderQueue#remove: " + "Got ForceReattemptException for " + this
-          + " for bucket = " + bucketId);
-        }
-      }
-      catch(EntryNotFoundException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("ParallelGatewaySenderQueue#remove: " + "Got EntryNotFoundException for " + this
-            + " for bucket = " + bucketId );
-        }
-      }
-    }
-  }
-  
-  /** 
-  * Keeps a track of number of elements peeked across all buckets.  
-  */ 
-  private void setElementsPeekedAcrossBuckets(int peekedElements) { 
-    this.elementsPeekedAcrossBuckets +=peekedElements; 
-  } 
-  
-  /** 
-  * Returns the number of elements peeked across all buckets. Also, 
-  * resets this counter. 
-  */ 
-  private int getAndresetElementsPeekedAcrossBuckets() { 
-    int peekedElements = this.elementsPeekedAcrossBuckets; 
-    this.elementsPeekedAcrossBuckets = 0; 
-    return peekedElements; 
-  } 
-
-  @Override
-  public void remove() throws CacheException {
-    throw new UnsupportedOperationException("Method HDFSParallelGatewaySenderQueue#remove is not supported");
-  }
- 
-  @Override
-  public void put(Object object) throws InterruptedException, CacheException {
-    super.put(object);
-  }
-  
-  protected ArrayList peekAhead(int bucketId, HDFSBucketRegionQueue hrq) throws CacheException {
-    
-    if (logger.isDebugEnabled()) {
-      logger.debug(this + ": Peekahead for the bucket " + bucketId);
-    }
-    ArrayList  list = hrq.peekABatch();
-    if (logger.isDebugEnabled() && list != null ) {
-      logger.debug(this + ": Peeked" + list.size() + "objects from bucket " + bucketId);
-    }
-
-    return list;
-  }
-  
-  @Override
-  public Object take() {
-    throw new UnsupportedOperationException("take() is not supported for " + HDFSParallelGatewaySenderQueue.class.toString());
-  }
-  
-  protected boolean isUsedForHDFS()
-  {
-    return true;
-  }
-  
-  @Override
-  protected void afterRegionAdd (PartitionedRegion userPR) {
-  }
-  
-  /**
-   * gets the value for region key from the HDFSBucketRegionQueue 
- * @param region 
-   * @throws ForceReattemptException 
-   */
-  public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey, int bucketId) throws ForceReattemptException  {
-    try {
-      HDFSBucketRegionQueue brq = getBucketRegionQueue(region, bucketId);
-      
-      if (brq ==null)
-        return null;
-      
-      return brq.getObjectForRegionKey(region, regionKey);
-    } catch(EntryNotFoundException e) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("HDFSParallelGatewaySenderQueue#get: " + "Got EntryNotFoundException for " + this
-            + " for bucket = " + bucketId);
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public void clear(PartitionedRegion pr, int bucketId) {
-    HDFSBucketRegionQueue brq;
-    try {
-      brq = getBucketRegionQueue(pr, bucketId);
-      if (brq == null)
-        return;
-      brq.clear();
-    } catch (ForceReattemptException e) {
-      //do nothing, bucket was destroyed.
-    }
-  }
-  
-  @Override
-  public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException {
-   HDFSBucketRegionQueue hq = getBucketRegionQueue(pr, bucketId);
-   return hq.size();
-  }
-
-  public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
-      int bucketId) throws ForceReattemptException {
-    PartitionedRegion leader = ColocationHelper.getLeaderRegion(region);
-    if (leader == null)
-      return null;
-    String leaderregionPath = leader.getFullPath();
-    PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(leaderregionPath);
-    if (prQ == null)
-      return null;
-    HDFSBucketRegionQueue brq;
-
-    brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
-        .getLocalBucketById(bucketId));
-    if(brq == null) {
-      prQ.getRegionAdvisor().waitForLocalBucketStorage(bucketId);
-    }
-    brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
-        .getInitializedBucketForId(null, bucketId));
-    return brq;
-  }
-  
-  /**
-   * This class has the responsibility of rolling the lists of Sorted event 
-   * Queue. The rolling of lists by a separate thread is required because 
-   * neither put thread nor the peek/remove thread can do that. Put thread
-   * cannot do it because that would mean doing some synchronization with 
-   * other put threads and peek thread that would hamper the put latency. 
-   * Peek thread cannot do it because if the event insert rate is too high
-   * the list size can go way beyond what its size. 
-   *
-   */
-  class RollSortedListsTimerTask extends SystemTimerTask {
-    
-    
-    /**
-     * This function ensures that if any of the buckets has lists that are beyond 
-     * its size, they gets rolled over into new skip lists. 
-     */
-    @Override
-    public void run2() {
-      Set<PartitionedRegion> prQs = getRegions();
-      for (PartitionedRegion prQ : prQs) {
-        ArrayList<Integer> buckets = new ArrayList<Integer>(prQ
-            .getDataStore().getAllLocalPrimaryBucketIds());
-        for (Integer bId : buckets) {
-          HDFSBucketRegionQueue hrq =  ((HDFSBucketRegionQueue)prQ
-              .getDataStore().getLocalBucketById(bId));
-          if (hrq == null) {
-            // bucket moved to another node after getAllLocalPrimaryBucketIds
-            // was called. continue fixing the next bucket. 
-            continue;
-          }
-          if (logger.isDebugEnabled()) {
-            logger.debug("Rolling over the list for bucket id: " + bId);
-          }
-          hrq.rolloverSkipList();
-         }
-      }
-    }
-  }
-   
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
deleted file mode 100644
index 16d3d87..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.Serializable;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.GemFireConfigException;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-
-/**
- * Class to hold all hdfs store related configuration. Instead of copying the
- * same members in two different classes, factory and store, this class will be
- * used. The idea is let HdfsStoreImpl and HdfsStoreCreation delegate get calls,
- * set calls and copy constructor calls this class. Moreover this config holder
- * can be entirely replaced to support alter config
- * 
- */
-public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Serializable {  
-  private String name = null;
-  private String namenodeURL = null;
-  private String homeDir = DEFAULT_HOME_DIR;
-  private String clientConfigFile = null;
-  private float blockCacheSize = DEFAULT_BLOCK_CACHE_SIZE;
-  private int maxFileSize = DEFAULT_WRITE_ONLY_FILE_SIZE_LIMIT;
-  private int fileRolloverInterval = DEFAULT_WRITE_ONLY_FILE_ROLLOVER_INTERVAL;
-  protected boolean isAutoCompact = DEFAULT_MINOR_COMPACTION;
-  protected boolean autoMajorCompact = DEFAULT_MAJOR_COMPACTION;
-  protected int maxConcurrency = DEFAULT_MINOR_COMPACTION_THREADS;
-  protected int majorCompactionConcurrency = DEFAULT_MAJOR_COMPACTION_THREADS;
-  protected int majorCompactionIntervalMins = DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS;
-  protected int maxInputFileSizeMB = DEFAULT_INPUT_FILE_SIZE_MAX_MB;
-  protected int maxInputFileCount = DEFAULT_INPUT_FILE_COUNT_MAX;
-  protected int minInputFileCount = DEFAULT_INPUT_FILE_COUNT_MIN;
-  protected int oldFileCleanupIntervalMins = DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS;
-  
-  protected int batchSize = DEFAULT_BATCH_SIZE_MB;
-  protected int batchIntervalMillis = DEFAULT_BATCH_INTERVAL_MILLIS;
-  protected int maximumQueueMemory = DEFAULT_MAX_BUFFER_MEMORY;
-  protected boolean isPersistenceEnabled = DEFAULT_BUFFER_PERSISTANCE;
-  protected String diskStoreName = null;
-  protected boolean diskSynchronous = DEFAULT_DISK_SYNCHRONOUS; 
-  protected int dispatcherThreads = DEFAULT_DISPATCHER_THREADS;
-  
-  private static final Logger logger = LogService.getLogger();
-  protected final String logPrefix;
-
-  public HDFSStoreConfigHolder() {
-    this(null);
-  }
-
-  /**
-   * @param config configuration source for creating this instance 
-   */
-  public HDFSStoreConfigHolder(HDFSStore config) {
-    this.logPrefix = "<" + getName() + "> ";
-    if (config == null) {
-      return;
-    }
-    
-    this.name = config.getName();
-    this.namenodeURL = config.getNameNodeURL();
-    this.homeDir = config.getHomeDir();
-    this.clientConfigFile = config.getHDFSClientConfigFile();
-    this.blockCacheSize = config.getBlockCacheSize();
-    this.maxFileSize = config.getWriteOnlyFileRolloverSize();
-    this.fileRolloverInterval = config.getWriteOnlyFileRolloverInterval();
-    isAutoCompact = config.getMinorCompaction();
-    maxConcurrency = config.getMinorCompactionThreads();
-    autoMajorCompact = config.getMajorCompaction();
-    majorCompactionConcurrency = config.getMajorCompactionThreads();
-    majorCompactionIntervalMins = config.getMajorCompactionInterval();
-    maxInputFileSizeMB = config.getInputFileSizeMax();
-    maxInputFileCount = config.getInputFileCountMax();
-    minInputFileCount = config.getInputFileCountMin();
-    oldFileCleanupIntervalMins = config.getPurgeInterval();
-    
-    batchSize = config.getBatchSize();
-    batchIntervalMillis = config.getBatchInterval();
-    maximumQueueMemory = config.getMaxMemory();
-    isPersistenceEnabled = config.getBufferPersistent();
-    diskStoreName = config.getDiskStoreName();
-    diskSynchronous = config.getSynchronousDiskWrite();
-    dispatcherThreads = config.getDispatcherThreads();
-  }
-  
-  public void resetDefaultValues() {
-    name = null;
-    namenodeURL = null;
-    homeDir = null;
-    clientConfigFile = null;
-    blockCacheSize = -1f;
-    maxFileSize = -1;
-    fileRolloverInterval = -1;
-    
-    isAutoCompact = false;
-    maxConcurrency = -1;
-    maxInputFileSizeMB = -1;
-    maxInputFileCount = -1;
-    minInputFileCount = -1;
-    oldFileCleanupIntervalMins = -1;
-
-    autoMajorCompact = false;
-    majorCompactionConcurrency = -1;
-    majorCompactionIntervalMins = -1;
-    
-    batchSize = -1;
-    batchIntervalMillis = -1;
-    maximumQueueMemory = -1;
-    isPersistenceEnabled = false;
-    diskStoreName = null;
-    diskSynchronous = false; 
-    dispatcherThreads = -1;
-  }
-  
-  public void copyFrom(HDFSStoreMutator mutator) {
-    if (mutator.getWriteOnlyFileRolloverInterval() >= 0) {
-      logAttrMutation("fileRolloverInterval", mutator.getWriteOnlyFileRolloverInterval());
-      setWriteOnlyFileRolloverInterval(mutator.getWriteOnlyFileRolloverInterval());
-    }
-    if (mutator.getWriteOnlyFileRolloverSize() >= 0) {
-      logAttrMutation("MaxFileSize", mutator.getWriteOnlyFileRolloverInterval());
-      setWriteOnlyFileRolloverSize(mutator.getWriteOnlyFileRolloverSize());
-    }
-    
-    if (mutator.getMinorCompaction() != null) {
-      logAttrMutation("MinorCompaction", mutator.getMinorCompaction());
-      setMinorCompaction(mutator.getMinorCompaction());
-    }
-    
-    if (mutator.getMinorCompactionThreads() >= 0) {
-      logAttrMutation("MaxThreads", mutator.getMinorCompactionThreads());
-      setMinorCompactionThreads(mutator.getMinorCompactionThreads());
-    }
-    
-    if (mutator.getMajorCompactionInterval() > -1) {
-      logAttrMutation("MajorCompactionIntervalMins", mutator.getMajorCompactionInterval());
-      setMajorCompactionInterval(mutator.getMajorCompactionInterval());
-    }
-    if (mutator.getMajorCompactionThreads() >= 0) {
-      logAttrMutation("MajorCompactionMaxThreads", mutator.getMajorCompactionThreads());
-      setMajorCompactionThreads(mutator.getMajorCompactionThreads());
-    }
-    if (mutator.getMajorCompaction() != null) {
-      logAttrMutation("AutoMajorCompaction", mutator.getMajorCompaction());
-      setMajorCompaction(mutator.getMajorCompaction());
-    }
-    if (mutator.getInputFileCountMax() >= 0) {
-      logAttrMutation("maxInputFileCount", mutator.getInputFileCountMax());
-      setInputFileCountMax(mutator.getInputFileCountMax());
-    }
-    if (mutator.getInputFileSizeMax() >= 0) {
-      logAttrMutation("MaxInputFileSizeMB", mutator.getInputFileSizeMax());
-      setInputFileSizeMax(mutator.getInputFileSizeMax());
-    }
-    if (mutator.getInputFileCountMin() >= 0) {
-      logAttrMutation("MinInputFileCount", mutator.getInputFileCountMin());
-      setInputFileCountMin(mutator.getInputFileCountMin());
-    }    
-    if (mutator.getPurgeInterval() >= 0) {
-      logAttrMutation("OldFilesCleanupIntervalMins", mutator.getPurgeInterval());
-      setPurgeInterval(mutator.getPurgeInterval());
-    }
-    
-    if (mutator.getBatchSize() >= 0) {
-      logAttrMutation("batchSizeMB", mutator.getWriteOnlyFileRolloverInterval());
-      setBatchSize(mutator.getBatchSize());
-    }
-    if (mutator.getBatchInterval() >= 0) {
-      logAttrMutation("batchTimeInterval", mutator.getWriteOnlyFileRolloverInterval());
-      setBatchInterval(mutator.getBatchInterval());
-    }
-  }
-
-  void logAttrMutation(String name, Object value) {
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Alter " + name + ":" + value, logPrefix);
-    }
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-  @Override
-  public HDFSStoreFactory setName(String name) {
-    this.name = name;
-    return this;
-  }
-
-  @Override
-  public String getNameNodeURL() {
-    return namenodeURL;
-  }
-  @Override
-  public HDFSStoreFactory setNameNodeURL(String namenodeURL) {
-    this.namenodeURL = namenodeURL;
-    return this;
-  }
-
-  @Override
-  public String getHomeDir() {
-    return homeDir;
-  }
-  @Override
-  public HDFSStoreFactory setHomeDir(String homeDir) {
-    this.homeDir = homeDir;
-    return this;
-  }
-
-  @Override
-  public String getHDFSClientConfigFile() {
-    return clientConfigFile;
-  }
-  @Override
-  public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) {
-    this.clientConfigFile = clientConfigFile;
-    return this;
-  }
-  
-  @Override
-  public HDFSStoreFactory setBlockCacheSize(float percentage) {
-    if(percentage < 0 || percentage > 100) {
-      throw new IllegalArgumentException("Block cache size must be between 0 and 100, inclusive");
-    }
-    this.blockCacheSize  = percentage;
-    return this;
-  }
-  
-  @Override
-  public float getBlockCacheSize() {
-    return blockCacheSize;
-  }
-  
-  @Override
-  public HDFSStoreFactory setWriteOnlyFileRolloverSize(int maxFileSize) {
-    assertIsPositive(CacheXml.HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL, maxFileSize);
-    this.maxFileSize = maxFileSize;
-    return this;
-  }
-  @Override
-  public int getWriteOnlyFileRolloverSize() {
-    return maxFileSize;
-  }
-
-  @Override
-  public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) {
-    assertIsPositive(CacheXml.HDFS_TIME_FOR_FILE_ROLLOVER, count);
-    this.fileRolloverInterval = count;
-    return this;
-  }
-  @Override
-  public int getWriteOnlyFileRolloverInterval() {
-    return fileRolloverInterval;
-  }
-  
-  @Override
-  public boolean getMinorCompaction() {
-    return isAutoCompact;
-  }
-  @Override
-  public HDFSStoreFactory setMinorCompaction(boolean auto) {
-    this.isAutoCompact = auto;
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setMinorCompactionThreads(int count) {
-    assertIsPositive(CacheXml.HDFS_MINOR_COMPACTION_THREADS, count);
-    this.maxConcurrency = count;
-    return this;
-  }
-  @Override
-  public int getMinorCompactionThreads() {
-    return maxConcurrency;
-  }
-
-  @Override
-  public HDFSStoreFactory setMajorCompaction(boolean auto) {
-    this.autoMajorCompact = auto;
-    return this;
-  }
-  @Override
-  public boolean getMajorCompaction() {
-    return autoMajorCompact;
-  }
-
-  @Override
-  public HDFSStoreFactory setMajorCompactionInterval(int count) {
-    HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_INTERVAL, count);
-    this.majorCompactionIntervalMins = count;
-    return this;
-  }
-  @Override
-  public int getMajorCompactionInterval() {
-    return majorCompactionIntervalMins;
-  }
-
-  @Override
-  public HDFSStoreFactory setMajorCompactionThreads(int count) {
-    HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_THREADS, count);
-    this.majorCompactionConcurrency = count;
-    return this;
-  }
-  @Override
-  public int getMajorCompactionThreads() {
-    return majorCompactionConcurrency;
-  }
-  
-  @Override
-  public HDFSStoreFactory setInputFileSizeMax(int size) {
-    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_SIZE_MB", size);
-    this.maxInputFileSizeMB = size;
-    return this;
-  }
-  @Override
-  public int getInputFileSizeMax() {
-    return maxInputFileSizeMB;
-  }
-
-  @Override
-  public HDFSStoreFactory setInputFileCountMin(int count) {
-    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MIN_INPUT_FILE_COUNT", count);
-    this.minInputFileCount = count;
-    return this;
-  }
-  @Override
-  public int getInputFileCountMin() {
-    return minInputFileCount;
-  }
-
-  @Override
-  public HDFSStoreFactory setInputFileCountMax(int count) {
-    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_COUNT", count);
-    this.maxInputFileCount = count;
-    return this;
-  }
-  @Override
-  public int getInputFileCountMax() {
-    return maxInputFileCount;
-  }
-
-  @Override
-  public int getPurgeInterval() {
-    return oldFileCleanupIntervalMins ;
-  }    
-  @Override
-  public HDFSStoreFactory setPurgeInterval(int interval) {
-    assertIsPositive(CacheXml.HDFS_PURGE_INTERVAL, interval);
-    this.oldFileCleanupIntervalMins = interval;
-    return this;
-  }
-  
-  protected void validate() {
-    if (minInputFileCount > maxInputFileCount) {
-      throw new IllegalArgumentException(
-          LocalizedStrings.HOPLOG_MIN_IS_MORE_THAN_MAX
-          .toLocalizedString(new Object[] {
-              "HDFS_COMPACTION_MIN_INPUT_FILE_COUNT",
-              minInputFileCount,
-              "HDFS_COMPACTION_MAX_INPUT_FILE_COUNT",
-              maxInputFileCount }));
-    }
-  }
-
-  /**
-   * This method should not be called on this class.
-   * @see HDFSStoreFactory#create(String)
-   */
-  @Override
-  public HDFSStore create(String name) throws GemFireConfigException,
-      StoreExistsException {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * This method should not be called on this class.
-   * @see HDFSStoreImpl#destroy()
-   */
-  @Override
-  public void destroy() {
-    throw new UnsupportedOperationException();
-  }
-  
-  public static void assertIsPositive(String name, int count) {
-    if (count < 1) {
-      throw new IllegalArgumentException(
-          LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
-              .toLocalizedString(new Object[] { name, count }));
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder();
-    builder.append("HDFSStoreConfigHolder@");
-    builder.append(System.identityHashCode(this));
-    builder.append(" [");
-    appendStrProp(builder, name, "name");
-    appendStrProp(builder, namenodeURL, "namenodeURL");
-    appendStrProp(builder, homeDir, "homeDir");
-    appendStrProp(builder, clientConfigFile, "clientConfigFile");
-    if (blockCacheSize > -1) {
-      builder.append("blockCacheSize=");
-      builder.append(blockCacheSize);
-      builder.append(", ");
-    }
-    appendIntProp(builder, maxFileSize, "maxFileSize");
-    appendIntProp(builder, fileRolloverInterval, "fileRolloverInterval");
-    appendBoolProp(builder, isAutoCompact, "isAutoCompact");
-    appendBoolProp(builder, autoMajorCompact, "autoMajorCompact");
-    appendIntProp(builder, maxConcurrency, "maxConcurrency");
-    appendIntProp(builder, majorCompactionConcurrency, "majorCompactionConcurrency");
-    appendIntProp(builder, majorCompactionIntervalMins, "majorCompactionIntervalMins");
-    appendIntProp(builder, maxInputFileSizeMB, "maxInputFileSizeMB");
-    appendIntProp(builder, maxInputFileCount, "maxInputFileCount");
-    appendIntProp(builder, minInputFileCount, "minInputFileCount");
-    appendIntProp(builder, oldFileCleanupIntervalMins, "oldFileCleanupIntervalMins");
-    appendIntProp(builder, batchSize, "batchSize");
-    appendIntProp(builder, batchIntervalMillis, "batchInterval");
-    appendIntProp(builder, maximumQueueMemory, "maximumQueueMemory");
-    appendIntProp(builder, dispatcherThreads, "dispatcherThreads");
-    appendBoolProp(builder, isPersistenceEnabled, "isPersistenceEnabled");
-    appendStrProp(builder, diskStoreName, "diskStoreName");
-    appendBoolProp(builder, diskSynchronous, "diskSynchronous");
-
-    builder.append("]");
-    return builder.toString();
-  }
-
-  private void appendStrProp(StringBuilder builder, String value, String name) {
-    if (value != null) {
-      builder.append(name + "=");
-      builder.append(value);
-      builder.append(", ");
-    }
-  }
-
-  private void appendIntProp(StringBuilder builder, int value, String name) {
-    if (value > -1) {
-      builder.append(name + "=");
-      builder.append(value);
-      builder.append(", ");
-    }
-  }
-  
-  private void appendBoolProp(StringBuilder builder, boolean value, String name) {
-    builder.append(name + "=");
-    builder.append(value);
-    builder.append(", ");
-  }
-
-  @Override
-  public HDFSStoreMutator createHdfsStoreMutator() {
-    // as part of alter execution, hdfs store will replace the config holder
-    // completely. Hence mutator at the config holder is not needed
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public HDFSStore alter(HDFSStoreMutator mutator) {
-    // as part of alter execution, hdfs store will replace the config holder
-    // completely. Hence mutator at the config holder is not needed
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String getDiskStoreName() {
-    return this.diskStoreName;
-  }
-  @Override
-  public HDFSStoreFactory setDiskStoreName(String name) {
-    this.diskStoreName = name;
-    return this;
-  }
-
-  @Override
-  public int getBatchInterval() {
-    return this.batchIntervalMillis;
-  }
-  @Override
-  public HDFSStoreFactory setBatchInterval(int intervalMillis){
-    this.batchIntervalMillis = intervalMillis;
-    return this;
-  }
-  
-  @Override
-  public boolean getBufferPersistent() {
-    return isPersistenceEnabled;
-  }
-  @Override
-  public HDFSStoreFactory setBufferPersistent(boolean isPersistent) {
-    this.isPersistenceEnabled = isPersistent;
-    return this;
-  }
-
-  @Override
-  public int getDispatcherThreads() {
-    return dispatcherThreads;
-  }
-  @Override
-  public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) {
-    this.dispatcherThreads = dispatcherThreads;
-    return this;
-  }
-  
-  @Override
-  public int getMaxMemory() {
-    return this.maximumQueueMemory;
-  }
-  @Override
-  public HDFSStoreFactory setMaxMemory(int memory) {
-    this.maximumQueueMemory = memory;
-    return this;
-  }
-  
-  @Override
-  public int getBatchSize() {
-    return this.batchSize;
-  }
-  @Override
-  public HDFSStoreFactory setBatchSize(int size){
-    this.batchSize = size;
-    return this;
-  }
-  
-  @Override
-  public boolean getSynchronousDiskWrite() {
-    return this.diskSynchronous;
-  }
-  @Override
-  public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) {
-    this.diskSynchronous = isSynchronous;
-    return this;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
deleted file mode 100644
index 9ecc5e3..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import com.gemstone.gemfire.GemFireConfigException;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-/**
- */
-public class HDFSStoreCreation implements HDFSStoreFactory {
-  protected HDFSStoreConfigHolder configHolder;
-  
-  public HDFSStoreCreation() {
-    this(null);
-  }
-
-  /**
-   * Copy constructor for HDFSStoreCreation
-   * @param config configuration source for creating this instance 
-   */
-  public HDFSStoreCreation(HDFSStoreCreation config) {
-    this.configHolder = new HDFSStoreConfigHolder(config == null ? null : config.configHolder);
-  }
-
-  @Override
-  public HDFSStoreFactory setName(String name) {
-    configHolder.setName(name);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setNameNodeURL(String namenodeURL) {
-    configHolder.setNameNodeURL(namenodeURL);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setHomeDir(String homeDir) {
-    configHolder.setHomeDir(homeDir);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) {
-    configHolder.setHDFSClientConfigFile(clientConfigFile);
-    return this;
-  }
-  
-  @Override
-  public HDFSStoreFactory setBlockCacheSize(float percentage) {
-    configHolder.setBlockCacheSize(percentage);
-    return this;
-  }
-  
-  @Override
-  public HDFSStoreFactory setWriteOnlyFileRolloverSize(int maxFileSize) {
-    configHolder.setWriteOnlyFileRolloverSize(maxFileSize);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) {
-    configHolder.setWriteOnlyFileRolloverInterval(count);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setMinorCompaction(boolean auto) {
-    configHolder.setMinorCompaction(auto);
-    return this;
-  }
-  
-  @Override
-  public HDFSStoreFactory setMinorCompactionThreads(int count) {
-    configHolder.setMinorCompactionThreads(count);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setMajorCompaction(boolean auto) {
-    configHolder.setMajorCompaction(auto);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setMajorCompactionInterval(int count) {
-    configHolder.setMajorCompactionInterval(count);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setMajorCompactionThreads(int count) {
-    configHolder.setMajorCompactionThreads(count);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setInputFileSizeMax(int size) {
-    configHolder.setInputFileSizeMax(size);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setInputFileCountMin(int count) {
-    configHolder.setInputFileCountMin(count);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setInputFileCountMax(int count) {
-    configHolder.setInputFileCountMax(count);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setPurgeInterval(int interval) {
-    configHolder.setPurgeInterval(interval);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setDiskStoreName(String name) {
-    configHolder.setDiskStoreName(name);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setMaxMemory(int memory) {
-    configHolder.setMaxMemory(memory);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setBatchInterval(int intervalMillis) {
-    configHolder.setBatchInterval(intervalMillis);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setBatchSize(int size) {
-    configHolder.setBatchSize(size);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setBufferPersistent(boolean isPersistent) {
-    configHolder.setBufferPersistent(isPersistent);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) {
-    configHolder.setSynchronousDiskWrite(isSynchronous);
-    return this;
-  }
-
-  @Override
-  public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) {
-    configHolder.setDispatcherThreads(dispatcherThreads);
-    return this;
-  }
-  
-  /**
-   * This method should not be called on this class.
-   * @see HDFSStoreFactory#create(String)
-   */
-  @Override
-  public HDFSStore create(String name) throws GemFireConfigException,
-      StoreExistsException {
-    throw new UnsupportedOperationException();
-  }
-
-  public static void assertIsPositive(String name, int count) {
-    if (count < 1) {
-      throw new IllegalArgumentException(
-          LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
-              .toLocalizedString(new Object[] { name, count }));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
deleted file mode 100644
index 749f01c..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import com.gemstone.gemfire.GemFireConfigException;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-
-
-/**
- * Implementation of HDFSStoreFactory 
- * 
- */
-public class HDFSStoreFactoryImpl extends HDFSStoreCreation {
-  public static final String DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS= "HDFS_QUEUE";
-  
-  private Cache cache;
-  
-  public HDFSStoreFactoryImpl(Cache cache) {
-    this(cache, null);
-  }
-  
-  public HDFSStoreFactoryImpl(Cache cache, HDFSStoreCreation config) {
-    super(config);
-    this.cache = cache;
-  }
-
-  @Override
-  public HDFSStore create(String name) {
-    if (name == null) {
-      throw new GemFireConfigException("HDFS store name not provided");
-    }
-    
-    this.configHolder.validate();
-    
-    HDFSStore result = null;
-    synchronized (this) {
-      if (this.cache instanceof GemFireCacheImpl) {
-        GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache;
-        if (gfc.findHDFSStore(name) != null) {
-          throw new StoreExistsException(name);
-        }
-        
-        HDFSStoreImpl hsi = new HDFSStoreImpl(name, this.configHolder);
-        gfc.addHDFSStore(hsi);
-        result = hsi;
-      }
-    }
-    return result;
-  }
-
-  public static final String getEventQueueName(String regionPath) {
-    return HDFSStoreFactoryImpl.DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS + "_"
-        + regionPath.replace('/', '_');
-  }
-
-  public HDFSStore getConfigView() {
-    return (HDFSStore) configHolder;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
deleted file mode 100644
index b5d56b6..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
+++ /dev/null
@@ -1,638 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.concurrent.Callable;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.ConnectTimeoutException;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import com.gemstone.gemfire.internal.util.SingletonCallable;
-import com.gemstone.gemfire.internal.util.SingletonValue;
-import com.gemstone.gemfire.internal.util.SingletonValue.SingletonBuilder;
-
-/**
- * Represents a HDFS based persistent store for region data.
- * 
- */
-public class HDFSStoreImpl implements HDFSStore {
-  
-  private volatile HDFSStoreConfigHolder configHolder; 
-  
-  private final SingletonValue<FileSystem> fs;
-
-  /**
-   * Used to make sure that only one thread creates the writer at a time. This prevents the dispatcher
-   * threads from cascading the Connection lock in DFS client see bug 51195
-   */
-  private final SingletonCallable<HoplogWriter> singletonWriter = new SingletonCallable<HoplogWriter>();
-
-  private final HFileStoreStatistics stats;
-  private final BlockCache blockCache;
-
-  private static HashSet<String> secureNameNodes = new HashSet<String>();
-  
-  private final boolean PERFORM_SECURE_HDFS_CHECK = Boolean.getBoolean(HoplogConfig.PERFORM_SECURE_HDFS_CHECK_PROP);
-  private static final Logger logger = LogService.getLogger();
-  protected final String logPrefix;
-  
-  static {
-    HdfsConfiguration.init();
-  }
-  
-  public HDFSStoreImpl(String name, final HDFSStore config) {
-    this.configHolder = new HDFSStoreConfigHolder(config);
-    configHolder.setName(name);
-
-    this.logPrefix = "<" + "HdfsStore:" + name + "> ";
-
-    stats = new HFileStoreStatistics(InternalDistributedSystem.getAnyInstance(), "HDFSStoreStatistics", name);
-
-    final Configuration hconf = new Configuration();
-        
-    // Set the block cache size.
-    // Disable the static block cache. We keep our own cache on the HDFS Store
-    // hconf.setFloat("hfile.block.cache.size", 0f);
-    if (this.getBlockCacheSize() != 0) {
-      long cacheSize = (long) (HeapMemoryMonitor.getTenuredPoolMaxMemory() * this.getBlockCacheSize() / 100);
-
-      // TODO use an off heap block cache if we're using off heap memory?
-      // See CacheConfig.instantiateBlockCache.
-      // According to Anthony, the off heap block cache is still
-      // experimental. Our own off heap cache might be a better bet.
-//      this.blockCache = new LruBlockCache(cacheSize,
-//          StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf, HFileSortedOplogFactory.convertStatistics(stats));
-      this.blockCache = new LruBlockCache(cacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf);
-    } else {
-      this.blockCache = null;
-    }
-    
-    final String clientFile = config.getHDFSClientConfigFile();
-    fs = new SingletonValue<FileSystem>(new SingletonBuilder<FileSystem>() {
-      @Override
-      public FileSystem create() throws IOException {
-        return createFileSystem(hconf, clientFile, false);
-      }
-
-      @Override
-      public void postCreate() {
-      }
-      
-      @Override
-      public void createInProgress() {
-      }
-    });
-    
-    FileSystem fileSystem = null;
-    try {
-      fileSystem = fs.get();
-    } catch (Throwable ex) {
-      throw new HDFSIOException(ex.getMessage(),ex);
-    }    
-    //HDFSCompactionConfig has already been initialized
-    long cleanUpIntervalMillis = getPurgeInterval() * 60 * 1000;
-    Path cleanUpIntervalPath = new Path(getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
-    HoplogUtil.exposeCleanupIntervalMillis(fileSystem, cleanUpIntervalPath, cleanUpIntervalMillis);
-  }
-  
-  /**
-   * Creates a new file system every time.  
-   */
-  public FileSystem createFileSystem() {
-    Configuration hconf = new Configuration();
-    try {
-      return createFileSystem(hconf, this.getHDFSClientConfigFile(), true);
-    } catch (Throwable ex) {
-      throw new HDFSIOException(ex.getMessage(),ex);
-    }
-  }
-  
-  private FileSystem createFileSystem(Configuration hconf, String configFile, boolean forceNew) throws IOException {
-    FileSystem filesystem = null; 
-    
-      // load hdfs client config file if specified. The path is on local file
-      // system
-      if (configFile != null) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("{}Adding resource config file to hdfs configuration:" + configFile, logPrefix);
-        }
-        hconf.addResource(new Path(configFile));
-        
-        if (! new File(configFile).exists()) {
-          logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_CLIENT_CONFIG_FILE_ABSENT, configFile));
-        }
-      }
-      
-      // This setting disables shutdown hook for file system object. Shutdown
-      // hook may cause FS object to close before the cache or store and
-      // unpredictable behavior. This setting is provided for GFXD like server
-      // use cases where FS close is managed by a server. This setting is not
-      // supported by old versions of hadoop, HADOOP-4829
-      hconf.setBoolean("fs.automatic.close", false);
-      
-      // Hadoop has a configuration parameter io.serializations that is a list of serialization 
-      // classes which can be used for obtaining serializers and deserializers. This parameter 
-      // by default contains avro classes. When a sequence file is created, it calls 
-      // SerializationFactory.getSerializer(keyclass). This internally creates objects using 
-      // reflection of all the classes that were part of io.serializations. But since, there is 
-      // no avro class available it throws an exception. 
-      // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes 
-      // that are important to us. 
-      hconf.setStrings("io.serializations",
-          new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"});
-      // create writer
-
-      SchemaMetrics.configureGlobally(hconf);
-      
-      String nameNodeURL = null;
-      if ((nameNodeURL = getNameNodeURL()) == null) {
-          nameNodeURL = hconf.get("fs.default.name");
-      }
-      
-      URI namenodeURI = URI.create(nameNodeURL);
-    
-    //if (! GemFireCacheImpl.getExisting().isHadoopGfxdLonerMode()) {
-      String authType = hconf.get("hadoop.security.authentication");
-      
-      //The following code handles Gemfire XD with secure HDFS
-      //A static set is used to cache all known secure HDFS NameNode urls.
-      UserGroupInformation.setConfiguration(hconf);
-
-      //Compare authentication method ignoring case to make GFXD future version complaint
-      //At least version 2.0.2 starts complaining if the string "kerberos" is not in all small case.
-      //However it seems current version of hadoop accept the authType in any case
-      if (authType.equalsIgnoreCase("kerberos")) {
-        
-        String principal = hconf.get(HoplogConfig.KERBEROS_PRINCIPAL);
-        String keyTab = hconf.get(HoplogConfig.KERBEROS_KEYTAB_FILE);
-       
-        if (!PERFORM_SECURE_HDFS_CHECK) {
-          if (logger.isDebugEnabled())
-            logger.debug("{}Ignore secure hdfs check", logPrefix);
-        } else {
-          if (!secureNameNodes.contains(nameNodeURL)) {
-            if (logger.isDebugEnabled())
-              logger.debug("{}Executing secure hdfs check", logPrefix);
-             try{
-              filesystem = FileSystem.newInstance(namenodeURI, hconf);
-              //Make sure no IOExceptions are generated when accessing insecure HDFS. 
-              filesystem.listFiles(new Path("/"),false);
-              throw new HDFSIOException("Gemfire XD HDFS client and HDFS cluster security levels do not match. The configured HDFS Namenode is not secured.");
-             } catch (IOException ex) {
-               secureNameNodes.add(nameNodeURL);
-             } finally {
-             //Close filesystem to avoid resource leak
-               if(filesystem != null) {
-                 closeFileSystemIgnoreError(filesystem);
-               }
-             }
-          }
-        }
-
-        // check to ensure the namenode principal is defined
-        String nameNodePrincipal = hconf.get("dfs.namenode.kerberos.principal");
-        if (nameNodePrincipal == null) {
-          throw new IOException(LocalizedStrings.GF_KERBEROS_NAMENODE_PRINCIPAL_UNDEF.toLocalizedString());
-        }
-        
-        // ok, the user specified a gfxd principal so we will try to login
-        if (principal != null) {
-          //If NameNode principal is the same as Gemfire XD principal, there is a 
-          //potential security hole
-          String regex = "[/@]";
-          if (nameNodePrincipal != null) {
-            String HDFSUser = nameNodePrincipal.split(regex)[0];
-            String GFXDUser = principal.split(regex)[0];
-            if (HDFSUser.equals(GFXDUser)) {
-              logger.warn(LocalizedMessage.create(LocalizedStrings.HDFS_USER_IS_SAME_AS_GF_USER, GFXDUser));
-            }
-          }
-          
-          // a keytab must exist if the user specifies a principal
-          if (keyTab == null) {
-            throw new IOException(LocalizedStrings.GF_KERBEROS_KEYTAB_UNDEF.toLocalizedString());
-          }
-          
-          // the keytab must exist as well
-          File f = new File(keyTab);
-          if (!f.exists()) {
-            throw new FileNotFoundException(LocalizedStrings.GF_KERBEROS_KEYTAB_FILE_ABSENT.toLocalizedString(f.getAbsolutePath()));
-          }
-
-          //Authenticate Gemfire XD principal to Kerberos KDC using Gemfire XD keytab file
-          String principalWithValidHost = SecurityUtil.getServerPrincipal(principal, "");
-          UserGroupInformation.loginUserFromKeytab(principalWithValidHost, keyTab);
-        } else {
-          logger.warn(LocalizedMessage.create(LocalizedStrings.GF_KERBEROS_PRINCIPAL_UNDEF));
-        }
-      }
-    //}
-
-    filesystem = getFileSystemFactory().create(namenodeURI, hconf, forceNew);
-    
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Initialized FileSystem linked to " + filesystem.getUri()
-          + " " + filesystem.hashCode(), logPrefix);
-    }
-    return filesystem;
-  }
-
-  public FileSystem getFileSystem() throws IOException {
-    return fs.get();
-  }
-  
-  public FileSystem getCachedFileSystem() {
-    return fs.getCachedValue();
-  }
-
-  public SingletonCallable<HoplogWriter> getSingletonWriter() {
-    return this.singletonWriter;
-  }
-
-  private final SingletonCallable<Boolean> fsExists = new SingletonCallable<Boolean>();
-
-  public boolean checkFileSystemExists() throws IOException {
-    try {
-      return fsExists.runSerially(new Callable<Boolean>() {
-        @Override
-        public Boolean call() throws Exception {
-          FileSystem fileSystem = getCachedFileSystem();
-          if (fileSystem == null) {
-            return false;
-          }
-          return fileSystem.exists(new Path("/"));
-        }
-      });
-    } catch (Exception e) {
-      if (e instanceof IOException) {
-        throw (IOException)e;
-      }
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * This method executes a query on namenode. If the query succeeds, FS
-   * instance is healthy. If it fails, the old instance is closed and a new
-   * instance is created.
-   */
-  public void checkAndClearFileSystem() {
-    FileSystem fileSystem = getCachedFileSystem();
-    
-    if (fileSystem != null) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}Checking file system at " + fileSystem.getUri(), logPrefix);
-      }
-      try {
-        checkFileSystemExists();
-        if (logger.isDebugEnabled()) {
-          logger.debug("{}FS client is ok: " + fileSystem.getUri() + " "
-              + fileSystem.hashCode(), logPrefix);
-        }
-        return;
-      } catch (ConnectTimeoutException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("{}Hdfs unreachable, FS client is ok: "
-              + fileSystem.getUri() + " " + fileSystem.hashCode(), logPrefix);
-        }
-        return;
-      } catch (IOException e) {
-        logger.debug("IOError in filesystem checkAndClear ", e);
-        
-        // The file system is closed or NN is not reachable. It is safest to
-        // create a new FS instance. If the NN continues to remain unavailable,
-        // all subsequent read/write request will cause HDFSIOException. This is
-        // similar to the way hbase manages failures. This has a drawback
-        // though. A network blip will result in all connections to be
-        // recreated. However trying to preserve the connections and waiting for
-        // FS to auto-recover is not deterministic.
-        if (e instanceof RemoteException) {
-          e = ((RemoteException) e).unwrapRemoteException();
-        }
-
-        logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_UNREACHABLE,
-            fileSystem.getUri()), e);
-      }
-
-      // compare and clear FS container. The fs container needs to be reusable
-      boolean result = fs.clear(fileSystem, true);
-      if (!result) {
-        // the FS instance changed after this call was initiated. Check again
-        logger.debug("{}Failed to clear FS ! I am inconsistent so retrying ..", logPrefix);
-        checkAndClearFileSystem();
-      } else {
-        closeFileSystemIgnoreError(fileSystem);
-      }      
-    }
-  }
-
-  private void closeFileSystemIgnoreError(FileSystem fileSystem) {
-    if (fileSystem == null) {
-      logger.debug("{}Trying to close null file system", logPrefix);
-      return;
-    }
-
-    try {
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}Closing file system at " + fileSystem.getUri() + " "
-            + fileSystem.hashCode(), logPrefix);
-      }
-      fileSystem.close();
-    } catch (Exception e) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Failed to close file system at " + fileSystem.getUri()
-            + " " + fileSystem.hashCode(), e);
-      }
-    }
-  }
-
-  public HFileStoreStatistics getStats() {
-    return stats;
-  }
-  
-  public BlockCache getBlockCache() {
-    return blockCache;
-  }
-
-  public void close() {
-    logger.debug("{}Closing file system: " + getName(), logPrefix);
-    stats.close();
-    blockCache.shutdown();
-    //Might want to clear the block cache, but it should be dereferenced.
-    
-    // release DDL hoplog organizer for this store. Also shutdown compaction
-    // threads. These two resources hold references to GemfireCacheImpl
-    // instance. Any error is releasing this resources is not critical and needs
-    // be ignored.
-    try {
-      HDFSCompactionManager manager = HDFSCompactionManager.getInstance(this);
-      if (manager != null) {
-        manager.reset();
-      }
-    } catch (Exception e) {
-      logger.info(e);
-    }
-    
-    // once this store is closed, this store should not be used again
-    FileSystem fileSystem = fs.clear(false);
-    if (fileSystem != null) {
-      closeFileSystemIgnoreError(fileSystem);
-    }    
-  }
-  
-  /**
-   * Test hook to remove all of the contents of the the folder
-   * for this HDFS store from HDFS.
-   * @throws IOException 
-   */
-  public void clearFolder() throws IOException {
-    getFileSystem().delete(new Path(getHomeDir()), true);
-  }
-  
-  @Override
-  public void destroy() {
-    Collection<String> regions = HDFSRegionDirector.getInstance().getRegionsInStore(this);
-    if(!regions.isEmpty()) {
-      throw new IllegalStateException("Cannot destroy a HDFS store that still contains regions: " + regions); 
-    }
-    close();
-    HDFSStoreDirector.getInstance().removeHDFSStore(this.getName());
-  }
-
-  @Override
-  public synchronized HDFSStore alter(HDFSStoreMutator mutator) {
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Altering hdfsStore " + this, logPrefix);
-      logger.debug("{}Mutator " + mutator, logPrefix);
-    }
-    HDFSStoreConfigHolder newHolder = new HDFSStoreConfigHolder(configHolder);
-    newHolder.copyFrom(mutator);
-    newHolder.validate();
-    HDFSStore oldStore = configHolder;
-    configHolder = newHolder;
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Resuult of Alter " + this, logPrefix);
-    }
-    return (HDFSStore) oldStore;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder();
-    builder.append("HDFSStoreImpl [");
-    if (configHolder != null) {
-      builder.append("configHolder=");
-      builder.append(configHolder);
-    }
-    builder.append("]");
-    return builder.toString();
-  }
-
-  @Override
-  public String getName() {
-    return configHolder.getName();
-  }
-
-  @Override
-  public String getNameNodeURL() {
-    return configHolder.getNameNodeURL();
-  }
-
-  @Override
-  public String getHomeDir() {
-    return configHolder.getHomeDir();
-  }
-
-  @Override
-  public String getHDFSClientConfigFile() {
-    return configHolder.getHDFSClientConfigFile();
-  }
-
-  @Override
-  public float getBlockCacheSize() {
-    return configHolder.getBlockCacheSize();
-  }
-
-  @Override
-  public int getWriteOnlyFileRolloverSize() {
-    return configHolder.getWriteOnlyFileRolloverSize();
-  }
-
-  @Override
-  public int getWriteOnlyFileRolloverInterval() {
-    return configHolder.getWriteOnlyFileRolloverInterval();
-  }
-
-  @Override
-  public boolean getMinorCompaction() {
-    return configHolder.getMinorCompaction();
-  }
-
-  @Override
-  public int getMinorCompactionThreads() {
-    return configHolder.getMinorCompactionThreads();
-  }
-
-  @Override
-  public boolean getMajorCompaction() {
-    return configHolder.getMajorCompaction();
-  }
-
-  @Override
-  public int getMajorCompactionInterval() {
-    return configHolder.getMajorCompactionInterval();
-  }
-
-  @Override
-  public int getMajorCompactionThreads() {
-    return configHolder.getMajorCompactionThreads();
-  }
-
-
-  @Override
-  public int getInputFileSizeMax() {
-    return configHolder.getInputFileSizeMax();
-  }
-
-  @Override
-  public int getInputFileCountMin() {
-    return configHolder.getInputFileCountMin();
-  }
-
-  @Override
-  public int getInputFileCountMax() {
-    return configHolder.getInputFileCountMax();
-  }
-
-  @Override
-  public int getPurgeInterval() {
-    return configHolder.getPurgeInterval();
-  }
-
-  @Override
-  public String getDiskStoreName() {
-    return configHolder.getDiskStoreName();
-  }
-
-  @Override
-  public int getMaxMemory() {
-    return configHolder.getMaxMemory();
-  }
-
-  @Override
-  public int getBatchSize() {
-    return configHolder.getBatchSize();
-  }
-
-  @Override
-  public int getBatchInterval() {
-    return configHolder.getBatchInterval();
-  }
-
-  @Override
-  public boolean getBufferPersistent() {
-    return configHolder.getBufferPersistent();
-  }
-
-  @Override
-  public boolean getSynchronousDiskWrite() {
-    return configHolder.getSynchronousDiskWrite();
-  }
-
-  @Override
-  public int getDispatcherThreads() {
-    return configHolder.getDispatcherThreads();
-  }
-  
-  @Override
-  public HDFSStoreMutator createHdfsStoreMutator() {
-    return new HDFSStoreMutatorImpl();
-  }
-
-  public FileSystemFactory getFileSystemFactory() {
-    return new DistributedFileSystemFactory();
-  }
-
-  /*
-   * Factory to create HDFS file system instances
-   */
-  static public interface FileSystemFactory {
-    public FileSystem create(URI namenode, Configuration conf, boolean forceNew) throws IOException;
-  }
-
-  /*
-   * File system factory implementations for creating instances of file system
-   * connected to distributed HDFS cluster
-   */
-  public class DistributedFileSystemFactory implements FileSystemFactory {
-    private final boolean ALLOW_TEST_FILE_SYSTEM = Boolean.getBoolean(HoplogConfig.ALLOW_LOCAL_HDFS_PROP);
-    private final boolean USE_FS_CACHE = Boolean.getBoolean(HoplogConfig.USE_FS_CACHE);
-
-    @Override
-    public FileSystem create(URI nn, Configuration conf, boolean create) throws IOException {
-      FileSystem filesystem;
-
-      if (USE_FS_CACHE && !create) {
-        filesystem = FileSystem.get(nn, conf);
-      } else {
-        filesystem = FileSystem.newInstance(nn, conf);
-      }
-
-      if (filesystem instanceof LocalFileSystem && !ALLOW_TEST_FILE_SYSTEM) {
-        closeFileSystemIgnoreError(filesystem);
-        throw new IllegalStateException(
-            LocalizedStrings.HOPLOG_TRYING_TO_CREATE_STANDALONE_SYSTEM.toLocalizedString(getNameNodeURL()));
-      }
-
-      return filesystem;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
deleted file mode 100644
index 203e623..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-public class HDFSStoreMutatorImpl implements HDFSStoreMutator {
-  private HDFSStoreConfigHolder configHolder;
-  private Boolean autoCompact;
-  private Boolean autoMajorCompact;
-
-  public HDFSStoreMutatorImpl() {
-    configHolder = new HDFSStoreConfigHolder();
-    configHolder.resetDefaultValues();
-  }
-
-  public HDFSStoreMutatorImpl(HDFSStore store) {
-    configHolder = new HDFSStoreConfigHolder(store);
-  }
-  
-  public HDFSStoreMutator setWriteOnlyFileRolloverSize(int maxFileSize) {
-    configHolder.setWriteOnlyFileRolloverSize(maxFileSize);
-    return this;
-  }
-  @Override
-  public int getWriteOnlyFileRolloverSize() {
-    return configHolder.getWriteOnlyFileRolloverSize();
-  }
-
-  @Override
-  public HDFSStoreMutator setWriteOnlyFileRolloverInterval(int count) {
-    configHolder.setWriteOnlyFileRolloverInterval(count);
-    return this;
-  }
-  @Override
-  public int getWriteOnlyFileRolloverInterval() {
-    return configHolder.getWriteOnlyFileRolloverInterval();
-  }
-
-  @Override
-  public HDFSStoreMutator setMinorCompaction(boolean auto) {
-    autoCompact = Boolean.valueOf(auto);
-    configHolder.setMinorCompaction(auto);
-    return null;
-  }
-  @Override
-  public Boolean getMinorCompaction() {
-    return autoCompact;
-  }
-  
-  @Override
-  public HDFSStoreMutator setMinorCompactionThreads(int count) {
-    configHolder.setMinorCompactionThreads(count);
-    return this;
-  }
-  @Override
-  public int getMinorCompactionThreads() {
-    return configHolder.getMinorCompactionThreads();
-  }
-  
-  @Override
-  public HDFSStoreMutator setMajorCompaction(boolean auto) {
-    autoMajorCompact = Boolean.valueOf(auto);
-    configHolder.setMajorCompaction(auto);
-    return this;
-  }
-  @Override
-  public Boolean getMajorCompaction() {
-    return autoMajorCompact;
-  }
-
-  @Override
-  public HDFSStoreMutator setMajorCompactionInterval(int count) {
-    configHolder.setMajorCompactionInterval(count);
-    return this;
-  }
-  @Override
-  public int getMajorCompactionInterval() {
-    return configHolder.getMajorCompactionInterval();
-  }
-
-  @Override
-  public HDFSStoreMutator setMajorCompactionThreads(int count) {
-    configHolder.setMajorCompactionThreads(count);
-    return this;
-  }
-  @Override
-  public int getMajorCompactionThreads() {
-    return configHolder.getMajorCompactionThreads();
-  }
-
-  @Override
-  public HDFSStoreMutator setInputFileSizeMax(int size) {
-    configHolder.setInputFileSizeMax(size);
-    return this;
-  }
-  @Override
-  public int getInputFileSizeMax() {
-    return configHolder.getInputFileSizeMax();
-  }
-  
-  @Override
-  public HDFSStoreMutator setInputFileCountMin(int count) {
-    configHolder.setInputFileCountMin(count);
-    return this;
-  }
-  @Override
-  public int getInputFileCountMin() {
-    return configHolder.getInputFileCountMin();
-  }
-  
-  @Override
-  public HDFSStoreMutator setInputFileCountMax(int count) {
-    configHolder.setInputFileCountMax(count);
-    return this;
-  }
-  @Override
-  public int getInputFileCountMax() {
-    return configHolder.getInputFileCountMax();
-  }
-  
-  @Override
-  public HDFSStoreMutator setPurgeInterval(int interval) {
-    configHolder.setPurgeInterval(interval);
-    return this;
-  }
-  @Override
-  public int getPurgeInterval() {
-    return configHolder.getPurgeInterval();
-  }
-
-  @Override
-  public int getBatchSize() {
-    return configHolder.batchSize;
-  }
-  @Override
-  public HDFSStoreMutator setBatchSize(int size) {
-    configHolder.setBatchSize(size);
-    return this;
-  }
-
-  
-  @Override
-  public int getBatchInterval() {
-    return configHolder.batchIntervalMillis;
-  }
-  @Override
-  public HDFSStoreMutator setBatchInterval(int interval) {
-    configHolder.setBatchInterval(interval);
-    return this;
-  }
-    
-  public static void assertIsPositive(String name, int count) {
-    if (count < 1) {
-      throw new IllegalArgumentException(
-          LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
-              .toLocalizedString(new Object[] { name, count }));
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder();
-    builder.append("HDFSStoreMutatorImpl [");
-    if (configHolder != null) {
-      builder.append("configHolder=");
-      builder.append(configHolder);
-      builder.append(", ");
-    }
-    if (autoCompact != null) {
-      builder.append("MinorCompaction=");
-      builder.append(autoCompact);
-      builder.append(", ");
-    }
-    if (getMajorCompaction() != null) {
-      builder.append("autoMajorCompaction=");
-      builder.append(getMajorCompaction());
-      builder.append(", ");
-    }
-    builder.append("]");
-    return builder.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
deleted file mode 100644
index 0298523..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
-import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-/**
- * Listener that persists events to a write only HDFS store
- *
- */
-public class HDFSWriteOnlyStoreEventListener implements
-    AsyncEventListener {
-
-  private final LogWriterI18n logger;
-  private volatile boolean senderStopped = false; 
-  private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f);
-  
-  
-  public HDFSWriteOnlyStoreEventListener(LogWriterI18n logger) {
-    this.logger = logger;
-  }
-  
-  @Override
-  public void close() {
-    senderStopped = true;
-  }
-
-  @Override
-  public boolean processEvents(List<AsyncEvent> events) {
-    if (Hoplog.NOP_WRITE) {
-      return true;
-    }
-
-    if (logger.fineEnabled())
-      logger.fine("HDFSWriteOnlyStoreEventListener: A total of " + events.size() + " events are sent from GemFire to persist on HDFS");
-    boolean success = false;
-    try {
-      failureTracker.sleepIfRetry();
-      HDFSGatewayEventImpl hdfsEvent = null;
-      int previousBucketId = -1;
-      BatchManager bm = null;
-      for (AsyncEvent asyncEvent : events) {
-        if (senderStopped){
-          if (logger.fineEnabled()) {
-            logger.fine("HDFSWriteOnlyStoreEventListener.processEvents: Cache is closing down. Ignoring the batch of data.");
-          }
-          return false;
-        }
-        hdfsEvent = (HDFSGatewayEventImpl)asyncEvent;
-        if (previousBucketId != hdfsEvent.getBucketId()){
-          if (previousBucketId != -1) 
-            persistBatch(bm, previousBucketId);
-          
-          previousBucketId = hdfsEvent.getBucketId();
-          bm = new BatchManager();
-        }
-        bm.addEvent(hdfsEvent);
-      }
-      try {
-        persistBatch(bm, hdfsEvent.getBucketId());
-      } catch (BucketMovedException e) {
-        logger.fine("Batch could not be written to HDFS as the bucket moved. bucket id: " + 
-            hdfsEvent.getBucketId() + " Exception: " + e);
-        return false;
-      }
-      success = true;
-    } catch (IOException e) {
-      logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
-      return false;
-    }
-    catch (ClassNotFoundException e) {
-      logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
-      return false;
-    }
-    catch (CacheClosedException e) {
-      // exit silently
-      if (logger.fineEnabled())
-        logger.fine(e);
-      return false;
-    } catch (ForceReattemptException e) {
-      if (logger.fineEnabled())
-        logger.fine(e);
-      return false;
-    } catch (InterruptedException e1) {
-      // TODO Auto-generated catch block
-      e1.printStackTrace();
-    } finally {
-      failureTracker.record(success);
-    }
-    return true;
-  }
-  
-  /**
-   * Persists batches of multiple regions specified by the batch manager
-   * 
-   */
-  private void persistBatch(BatchManager bm, int bucketId) throws IOException, ForceReattemptException {
-    Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> eventsPerRegion = 
-        bm.iterator();
-    HoplogOrganizer bucketOrganizer = null; 
-    while (eventsPerRegion.hasNext()) {
-      Map.Entry<LocalRegion, ArrayList<QueuedPersistentEvent>> eventsForARegion = eventsPerRegion.next();
-      bucketOrganizer = getOrganizer((PartitionedRegion) eventsForARegion.getKey(), bucketId);
-      // bucket organizer cannot be null. 
-      if (bucketOrganizer == null)
-        throw new BucketMovedException("Bucket moved. BucketID: " + bucketId + "  HdfsRegion: " +  eventsForARegion.getKey().getName());
-      bucketOrganizer.flush(eventsForARegion.getValue().iterator(), eventsForARegion.getValue().size());
-      if (logger.fineEnabled()) {
-        logger.fine("Batch written to HDFS of size " +  eventsForARegion.getValue().size() + 
-            " for region " + eventsForARegion.getKey());
-      }
-    }
-  }
-
-  private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) {
-    BucketRegion br = region.getDataStore().getLocalBucketById(bucketId);
-    if (br == null) {
-      // got rebalanced or something
-      throw new BucketMovedException("Bucket region is no longer available. BucketId: "+
-          bucketId + " HdfsRegion: " +  region.getName());
-    }
-
-    return br.getHoplogOrganizer();
-  }
-  
-  /**
-   * Sorts out events of the multiple regions into lists per region 
-   *
-   */
-  private class BatchManager implements Iterable<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> {
-    private HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>> regionBatches = 
-        new HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>>();
-    
-    public void addEvent (HDFSGatewayEventImpl hdfsEvent) throws IOException, ClassNotFoundException {
-      LocalRegion region = (LocalRegion) hdfsEvent.getRegion();
-      ArrayList<QueuedPersistentEvent> regionList = regionBatches.get(region);
-      if (regionList == null) {
-        regionList = new ArrayList<QueuedPersistentEvent>();
-        regionBatches.put(region, regionList);
-      }
-      regionList.add(new UnsortedHDFSQueuePersistedEvent(hdfsEvent));
-    }
-
-    @Override
-    public Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> iterator() {
-      return regionBatches.entrySet().iterator();
-    }
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
deleted file mode 100644
index c7ba23f..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogListener;
-
-/**
- * Objects of this class needs to be created for every region. These objects 
- * listen to the oplog events and take appropriate action.   
- *
- */
-public class HoplogListenerForRegion implements HoplogListener {
-
-  private List<HoplogListener> otherListeners = new CopyOnWriteArrayList<HoplogListener>();
-
-  public HoplogListenerForRegion() {
-    
-  }
-
-  @Override
-  public void hoplogCreated(String regionFolder, int bucketId,
-      Hoplog... oplogs) throws IOException {
-    for (HoplogListener listener : this.otherListeners) {
-      listener.hoplogCreated(regionFolder, bucketId, oplogs);
-    }
-  }
-
-  @Override
-  public void hoplogDeleted(String regionFolder, int bucketId,
-      Hoplog... oplogs) {
-    for (HoplogListener listener : this.otherListeners) {
-      try {
-        listener.hoplogDeleted(regionFolder, bucketId, oplogs);
-      } catch (IOException e) {
-        // TODO handle
-        throw new HDFSIOException(e.getLocalizedMessage(), e);
-      }
-    }
-  }
-
-  public void addListener(HoplogListener listener) {
-    this.otherListeners.add(listener);
-  }
-
-  @Override
-  public void compactionCompleted(String region, int bucket, boolean isMajor) {
-    for (HoplogListener listener : this.otherListeners) {
-      listener.compactionCompleted(region, bucket, isMajor);
-    }
-  }
-}