You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:09 UTC
[23/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
new file mode 100644
index 0000000..1e6a034
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.SystemTimer;
+import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
+import com.gemstone.gemfire.internal.cache.ColocationHelper;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+
+/**
+ * Parallel Gateway Sender Queue extended for HDFS functionality
+ *
+ */
+public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
+
+ private int currentBucketIndex = 0;
+ private int elementsPeekedAcrossBuckets = 0;
+ private SystemTimer rollListTimer = null;
+ public static final String ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP = "gemfire.ROLL_SORTED_LIST_TIME_INTERVAL_MS";
+ private final int ROLL_SORTED_LIST_TIME_INTERVAL_MS = Integer.getInteger(ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP, 3000);
+
+ public HDFSParallelGatewaySenderQueue(AbstractGatewaySender sender,
+ Set<Region> userPRs, int idx, int nDispatcher) {
+
+ super(sender, userPRs, idx, nDispatcher);
+ //only first dispatcher Hemant?
+ if (sender.getBucketSorted() && this.index == 0) {
+ rollListTimer = new SystemTimer(sender.getCache().getDistributedSystem(),
+ true);
+ // schedule the task to roll the skip lists
+ rollListTimer.scheduleAtFixedRate(new RollSortedListsTimerTask(),
+ ROLL_SORTED_LIST_TIME_INTERVAL_MS, ROLL_SORTED_LIST_TIME_INTERVAL_MS);
+ }
+ }
+
+ @Override
+ public Object peek() throws InterruptedException, CacheException {
+ /* If you call peek and use super.peek it leads to the following exception.
+ * So I'm adding an explicit UnsupportedOperationException.
+ Caused by: java.lang.ClassCastException: com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue cannot be cast to com.gemstone.gemfire.internal.cache.BucketRegionQueue
+ at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.getRandomPrimaryBucket(ParallelGatewaySenderQueue.java:964)
+ at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.peek(ParallelGatewaySenderQueue.java:1078)
+ */
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public void cleanUp() {
+ super.cleanUp();
+ cancelRollListTimer();
+ }
+
+ private void cancelRollListTimer() {
+ if (rollListTimer != null) {
+ rollListTimer.cancel();
+ rollListTimer = null;
+ }
+ }
+ /**
+ * A call to this function peeks elements from the first local primary bucket.
+ * Next call to this function peeks elements from the next local primary
+ * bucket and so on.
+ */
+ @Override
+ public List peek(int batchSize, int timeToWait) throws InterruptedException,
+ CacheException {
+
+ List batch = new ArrayList();
+
+ int batchSizeInBytes = batchSize*1024*1024;
+ PartitionedRegion prQ = getRandomShadowPR();
+ if (prQ == null || prQ.getLocalMaxMemory() == 0) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ blockProcesorThreadIfRequired();
+ return batch;
+ }
+
+ ArrayList list = null;
+ ArrayList<Integer> pbuckets = new ArrayList<Integer>(prQ
+ .getDataStore().getAllLocalPrimaryBucketIds());
+ ArrayList<Integer> buckets = new ArrayList<Integer>();
+ for(Integer i : pbuckets) {
+ if(i % this.nDispatcher == this.index)
+ buckets.add(i);
+ }
+ // In case of failures, peekedEvents would possibly have some elements
+ // add them.
+ if (this.resetLastPeeked) {
+ int previousBucketId = -1;
+ boolean stillPrimary = true;
+ Iterator<GatewaySenderEventImpl> iter = peekedEvents.iterator();
+ // we need to remove the events of the bucket that are no more primary on
+ // this node as they cannot be persisted from this node.
+ while(iter.hasNext()) {
+ HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)iter.next();
+ if (previousBucketId != hdfsEvent.getBucketId()){
+ stillPrimary = buckets.contains(hdfsEvent.getBucketId());
+ previousBucketId = hdfsEvent.getBucketId();
+ }
+ if (stillPrimary)
+ batch.add(hdfsEvent);
+ else {
+ iter.remove();
+ }
+ }
+ this.resetLastPeeked = false;
+ }
+
+ if (buckets.size() == 0) {
+ // Sleep a bit before trying again. provided by Dan
+ try {
+ Thread.sleep(50);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return batch;
+ }
+
+ if (this.sender.getBucketSorted()) {
+
+ }
+
+ // Each call to this function returns index of next bucket
+ // that is to be processed. This function takes care
+ // of the bucket sequence that is peeked by a sequence of
+ // peek calls.
+ // If there are bucket movements between two consecutive
+ // calls to this function then there is chance that a bucket
+ // is processed twice while another one is skipped. But, that is
+ // ok because in the next round, it will be processed.
+ Integer bIdIndex = getCurrentBucketIndex(buckets.size());
+
+ // If we have gone through all the buckets once and no
+ // elements were peeked from any of the buckets, take a nap.
+ // This always sleep in the first call but that should be ok
+ // because the timeToWait in practical use cases would be greater
+ // than this sleep of 100 ms.
+ if (bIdIndex == 0 && getAndresetElementsPeekedAcrossBuckets() == 0) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ HDFSBucketRegionQueue hrq = ((HDFSBucketRegionQueue)prQ
+ .getDataStore().getLocalBucketById(buckets.get(bIdIndex)));
+
+ if (hrq == null) {
+ // bucket moved to another node after getAllLocalPrimaryBucketIds
+ // was called. Peeking not possible. return.
+ return batch;
+ }
+ long entriesWaitingTobePeeked = hrq.totalEntries();
+
+ if (entriesWaitingTobePeeked == 0) {
+ blockProcesorThreadIfRequired();
+ return batch;
+ }
+
+ long currentTimeInMillis = System.currentTimeMillis();
+ long bucketSizeInBytes = hrq.getQueueSizeInBytes();
+ if (((currentTimeInMillis - hrq.getLastPeekTimeInMillis()) > timeToWait)
+ || ( bucketSizeInBytes > batchSizeInBytes)
+ || hrq.shouldDrainImmediately()) {
+ // peek now
+ if (logger.isDebugEnabled()) {
+ logger.debug("Peeking queue " + hrq.getId() + ": bucketSizeInBytes " + bucketSizeInBytes
+ + ": batchSizeInBytes" + batchSizeInBytes
+ + ": timeToWait" + timeToWait
+ + ": (currentTimeInMillis - hrq.getLastPeekTimeInMillis())" + (currentTimeInMillis - hrq.getLastPeekTimeInMillis()));
+ }
+
+ list = peekAhead(buckets.get(bIdIndex), hrq);
+
+ if (list != null && list.size() != 0 ) {
+ for (Object object : list) {
+ batch.add(object);
+ peekedEvents.add((HDFSGatewayEventImpl)object);
+ }
+ }
+ }
+ else {
+ blockProcesorThreadIfRequired();
+ }
+ if (logger.isDebugEnabled() && batch.size() > 0) {
+ logger.debug(this + ": Peeked a batch of " + batch.size() + " entries");
+ }
+
+ setElementsPeekedAcrossBuckets(batch.size());
+
+ return batch;
+ }
+
+ /**
+ * This function maintains an index of the last processed bucket.
+ * When it is called, it returns index of the next bucket.
+ * @param totalBuckets
+ * @return current bucket index
+ */
+ private int getCurrentBucketIndex(int totalBuckets) {
+ int retBucket = currentBucketIndex;
+ if (retBucket >= totalBuckets) {
+ currentBucketIndex = 0;
+ retBucket = 0;
+ }
+
+ currentBucketIndex++;
+
+ return retBucket;
+ }
+
+ @Override
+ public void remove(int batchSize) throws CacheException {
+ int destroyed = 0;
+ HDFSGatewayEventImpl event = null;
+
+ if (this.peekedEvents.size() > 0)
+ event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
+
+ while (event != null && destroyed < batchSize) {
+ Region currentRegion = event.getRegion();
+ int currentBucketId = event.getBucketId();
+ int bucketId = event.getBucketId();
+
+ ArrayList<HDFSGatewayEventImpl> listToDestroy = new ArrayList<HDFSGatewayEventImpl>();
+ ArrayList<Object> destroyedSeqNum = new ArrayList<Object>();
+
+ // create a batch of all the entries of a bucket
+ while (bucketId == currentBucketId) {
+ listToDestroy.add(event);
+ destroyedSeqNum.add(event.getShadowKey());
+ destroyed++;
+
+ if (this.peekedEvents.size() == 0 || (destroyed) >= batchSize) {
+ event = null;
+ break;
+ }
+
+ event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
+
+ bucketId = event.getBucketId();
+
+ if (!this.sender.isRunning()){
+ if (logger.isDebugEnabled()) {
+ logger.debug("ParallelGatewaySenderQueue#remove: Cache is closing down. Ignoring remove request.");
+ }
+ return;
+ }
+ }
+ try {
+ HDFSBucketRegionQueue brq = getBucketRegionQueue((PartitionedRegion) currentRegion, currentBucketId);
+
+ if (brq != null) {
+ // destroy the entries from the bucket
+ brq.destroyKeys(listToDestroy);
+ // Adding the removed event to the map for BatchRemovalMessage
+ // We need to provide the prQ as there could be multiple
+ // queue in a PGS now.
+ PartitionedRegion prQ = brq.getPartitionedRegion();
+ addRemovedEvents(prQ, currentBucketId, destroyedSeqNum);
+ }
+
+ } catch (ForceReattemptException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ParallelGatewaySenderQueue#remove: " + "Got ForceReattemptException for " + this
+ + " for bucket = " + bucketId);
+ }
+ }
+ catch(EntryNotFoundException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ParallelGatewaySenderQueue#remove: " + "Got EntryNotFoundException for " + this
+ + " for bucket = " + bucketId );
+ }
+ }
+ }
+ }
+
+ /**
+ * Keeps a track of number of elements peeked across all buckets.
+ */
+ private void setElementsPeekedAcrossBuckets(int peekedElements) {
+ this.elementsPeekedAcrossBuckets +=peekedElements;
+ }
+
+ /**
+ * Returns the number of elements peeked across all buckets. Also,
+ * resets this counter.
+ */
+ private int getAndresetElementsPeekedAcrossBuckets() {
+ int peekedElements = this.elementsPeekedAcrossBuckets;
+ this.elementsPeekedAcrossBuckets = 0;
+ return peekedElements;
+ }
+
+ @Override
+ public void remove() throws CacheException {
+ throw new UnsupportedOperationException("Method HDFSParallelGatewaySenderQueue#remove is not supported");
+ }
+
+ @Override
+ public void put(Object object) throws InterruptedException, CacheException {
+ super.put(object);
+ }
+
+ protected ArrayList peekAhead(int bucketId, HDFSBucketRegionQueue hrq) throws CacheException {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(this + ": Peekahead for the bucket " + bucketId);
+ }
+ ArrayList list = hrq.peekABatch();
+ if (logger.isDebugEnabled() && list != null ) {
+ logger.debug(this + ": Peeked" + list.size() + "objects from bucket " + bucketId);
+ }
+
+ return list;
+ }
+
+ @Override
+ public Object take() {
+ throw new UnsupportedOperationException("take() is not supported for " + HDFSParallelGatewaySenderQueue.class.toString());
+ }
+
+ protected boolean isUsedForHDFS()
+ {
+ return true;
+ }
+
+ @Override
+ protected void afterRegionAdd (PartitionedRegion userPR) {
+ }
+
+ /**
+ * gets the value for region key from the HDFSBucketRegionQueue
+ * @param region
+ * @throws ForceReattemptException
+ */
+ public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey, int bucketId) throws ForceReattemptException {
+ try {
+ HDFSBucketRegionQueue brq = getBucketRegionQueue(region, bucketId);
+
+ if (brq ==null)
+ return null;
+
+ return brq.getObjectForRegionKey(region, regionKey);
+ } catch(EntryNotFoundException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("HDFSParallelGatewaySenderQueue#get: " + "Got EntryNotFoundException for " + this
+ + " for bucket = " + bucketId);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void clear(PartitionedRegion pr, int bucketId) {
+ HDFSBucketRegionQueue brq;
+ try {
+ brq = getBucketRegionQueue(pr, bucketId);
+ if (brq == null)
+ return;
+ brq.clear();
+ } catch (ForceReattemptException e) {
+ //do nothing, bucket was destroyed.
+ }
+ }
+
+ @Override
+ public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException {
+ HDFSBucketRegionQueue hq = getBucketRegionQueue(pr, bucketId);
+ return hq.size();
+ }
+
+ public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
+ int bucketId) throws ForceReattemptException {
+ PartitionedRegion leader = ColocationHelper.getLeaderRegion(region);
+ if (leader == null)
+ return null;
+ String leaderregionPath = leader.getFullPath();
+ PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(leaderregionPath);
+ if (prQ == null)
+ return null;
+ HDFSBucketRegionQueue brq;
+
+ brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
+ .getLocalBucketById(bucketId));
+ if(brq == null) {
+ prQ.getRegionAdvisor().waitForLocalBucketStorage(bucketId);
+ }
+ brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
+ .getInitializedBucketForId(null, bucketId));
+ return brq;
+ }
+
+ /**
+ * This class has the responsibility of rolling the lists of Sorted event
+ * Queue. The rolling of lists by a separate thread is required because
+ * neither put thread nor the peek/remove thread can do that. Put thread
+ * cannot do it because that would mean doing some synchronization with
+ * other put threads and peek thread that would hamper the put latency.
+ * Peek thread cannot do it because if the event insert rate is too high
+ * the list size can go way beyond what its size.
+ *
+ */
+ class RollSortedListsTimerTask extends SystemTimerTask {
+
+
+ /**
+ * This function ensures that if any of the buckets has lists that are beyond
+ * its size, they gets rolled over into new skip lists.
+ */
+ @Override
+ public void run2() {
+ Set<PartitionedRegion> prQs = getRegions();
+ for (PartitionedRegion prQ : prQs) {
+ ArrayList<Integer> buckets = new ArrayList<Integer>(prQ
+ .getDataStore().getAllLocalPrimaryBucketIds());
+ for (Integer bId : buckets) {
+ HDFSBucketRegionQueue hrq = ((HDFSBucketRegionQueue)prQ
+ .getDataStore().getLocalBucketById(bId));
+ if (hrq == null) {
+ // bucket moved to another node after getAllLocalPrimaryBucketIds
+ // was called. continue fixing the next bucket.
+ continue;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rolling over the list for bucket id: " + bId);
+ }
+ hrq.rolloverSkipList();
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
new file mode 100644
index 0000000..16d3d87
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.Serializable;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+
+/**
+ * Class to hold all hdfs store related configuration. Instead of copying the
+ * same members in two different classes, factory and store, this class will be
+ * used. The idea is let HdfsStoreImpl and HdfsStoreCreation delegate get calls,
+ * set calls and copy constructor calls this class. Moreover this config holder
+ * can be entirely replaced to support alter config
+ *
+ */
+public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Serializable {
+ private String name = null;
+ private String namenodeURL = null;
+ private String homeDir = DEFAULT_HOME_DIR;
+ private String clientConfigFile = null;
+ private float blockCacheSize = DEFAULT_BLOCK_CACHE_SIZE;
+ private int maxFileSize = DEFAULT_WRITE_ONLY_FILE_SIZE_LIMIT;
+ private int fileRolloverInterval = DEFAULT_WRITE_ONLY_FILE_ROLLOVER_INTERVAL;
+ protected boolean isAutoCompact = DEFAULT_MINOR_COMPACTION;
+ protected boolean autoMajorCompact = DEFAULT_MAJOR_COMPACTION;
+ protected int maxConcurrency = DEFAULT_MINOR_COMPACTION_THREADS;
+ protected int majorCompactionConcurrency = DEFAULT_MAJOR_COMPACTION_THREADS;
+ protected int majorCompactionIntervalMins = DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS;
+ protected int maxInputFileSizeMB = DEFAULT_INPUT_FILE_SIZE_MAX_MB;
+ protected int maxInputFileCount = DEFAULT_INPUT_FILE_COUNT_MAX;
+ protected int minInputFileCount = DEFAULT_INPUT_FILE_COUNT_MIN;
+ protected int oldFileCleanupIntervalMins = DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS;
+
+ protected int batchSize = DEFAULT_BATCH_SIZE_MB;
+ protected int batchIntervalMillis = DEFAULT_BATCH_INTERVAL_MILLIS;
+ protected int maximumQueueMemory = DEFAULT_MAX_BUFFER_MEMORY;
+ protected boolean isPersistenceEnabled = DEFAULT_BUFFER_PERSISTANCE;
+ protected String diskStoreName = null;
+ protected boolean diskSynchronous = DEFAULT_DISK_SYNCHRONOUS;
+ protected int dispatcherThreads = DEFAULT_DISPATCHER_THREADS;
+
+ private static final Logger logger = LogService.getLogger();
+ protected final String logPrefix;
+
+ public HDFSStoreConfigHolder() {
+ this(null);
+ }
+
+ /**
+ * @param config configuration source for creating this instance
+ */
+ public HDFSStoreConfigHolder(HDFSStore config) {
+ this.logPrefix = "<" + getName() + "> ";
+ if (config == null) {
+ return;
+ }
+
+ this.name = config.getName();
+ this.namenodeURL = config.getNameNodeURL();
+ this.homeDir = config.getHomeDir();
+ this.clientConfigFile = config.getHDFSClientConfigFile();
+ this.blockCacheSize = config.getBlockCacheSize();
+ this.maxFileSize = config.getWriteOnlyFileRolloverSize();
+ this.fileRolloverInterval = config.getWriteOnlyFileRolloverInterval();
+ isAutoCompact = config.getMinorCompaction();
+ maxConcurrency = config.getMinorCompactionThreads();
+ autoMajorCompact = config.getMajorCompaction();
+ majorCompactionConcurrency = config.getMajorCompactionThreads();
+ majorCompactionIntervalMins = config.getMajorCompactionInterval();
+ maxInputFileSizeMB = config.getInputFileSizeMax();
+ maxInputFileCount = config.getInputFileCountMax();
+ minInputFileCount = config.getInputFileCountMin();
+ oldFileCleanupIntervalMins = config.getPurgeInterval();
+
+ batchSize = config.getBatchSize();
+ batchIntervalMillis = config.getBatchInterval();
+ maximumQueueMemory = config.getMaxMemory();
+ isPersistenceEnabled = config.getBufferPersistent();
+ diskStoreName = config.getDiskStoreName();
+ diskSynchronous = config.getSynchronousDiskWrite();
+ dispatcherThreads = config.getDispatcherThreads();
+ }
+
+ public void resetDefaultValues() {
+ name = null;
+ namenodeURL = null;
+ homeDir = null;
+ clientConfigFile = null;
+ blockCacheSize = -1f;
+ maxFileSize = -1;
+ fileRolloverInterval = -1;
+
+ isAutoCompact = false;
+ maxConcurrency = -1;
+ maxInputFileSizeMB = -1;
+ maxInputFileCount = -1;
+ minInputFileCount = -1;
+ oldFileCleanupIntervalMins = -1;
+
+ autoMajorCompact = false;
+ majorCompactionConcurrency = -1;
+ majorCompactionIntervalMins = -1;
+
+ batchSize = -1;
+ batchIntervalMillis = -1;
+ maximumQueueMemory = -1;
+ isPersistenceEnabled = false;
+ diskStoreName = null;
+ diskSynchronous = false;
+ dispatcherThreads = -1;
+ }
+
+ public void copyFrom(HDFSStoreMutator mutator) {
+ if (mutator.getWriteOnlyFileRolloverInterval() >= 0) {
+ logAttrMutation("fileRolloverInterval", mutator.getWriteOnlyFileRolloverInterval());
+ setWriteOnlyFileRolloverInterval(mutator.getWriteOnlyFileRolloverInterval());
+ }
+ if (mutator.getWriteOnlyFileRolloverSize() >= 0) {
+ logAttrMutation("MaxFileSize", mutator.getWriteOnlyFileRolloverInterval());
+ setWriteOnlyFileRolloverSize(mutator.getWriteOnlyFileRolloverSize());
+ }
+
+ if (mutator.getMinorCompaction() != null) {
+ logAttrMutation("MinorCompaction", mutator.getMinorCompaction());
+ setMinorCompaction(mutator.getMinorCompaction());
+ }
+
+ if (mutator.getMinorCompactionThreads() >= 0) {
+ logAttrMutation("MaxThreads", mutator.getMinorCompactionThreads());
+ setMinorCompactionThreads(mutator.getMinorCompactionThreads());
+ }
+
+ if (mutator.getMajorCompactionInterval() > -1) {
+ logAttrMutation("MajorCompactionIntervalMins", mutator.getMajorCompactionInterval());
+ setMajorCompactionInterval(mutator.getMajorCompactionInterval());
+ }
+ if (mutator.getMajorCompactionThreads() >= 0) {
+ logAttrMutation("MajorCompactionMaxThreads", mutator.getMajorCompactionThreads());
+ setMajorCompactionThreads(mutator.getMajorCompactionThreads());
+ }
+ if (mutator.getMajorCompaction() != null) {
+ logAttrMutation("AutoMajorCompaction", mutator.getMajorCompaction());
+ setMajorCompaction(mutator.getMajorCompaction());
+ }
+ if (mutator.getInputFileCountMax() >= 0) {
+ logAttrMutation("maxInputFileCount", mutator.getInputFileCountMax());
+ setInputFileCountMax(mutator.getInputFileCountMax());
+ }
+ if (mutator.getInputFileSizeMax() >= 0) {
+ logAttrMutation("MaxInputFileSizeMB", mutator.getInputFileSizeMax());
+ setInputFileSizeMax(mutator.getInputFileSizeMax());
+ }
+ if (mutator.getInputFileCountMin() >= 0) {
+ logAttrMutation("MinInputFileCount", mutator.getInputFileCountMin());
+ setInputFileCountMin(mutator.getInputFileCountMin());
+ }
+ if (mutator.getPurgeInterval() >= 0) {
+ logAttrMutation("OldFilesCleanupIntervalMins", mutator.getPurgeInterval());
+ setPurgeInterval(mutator.getPurgeInterval());
+ }
+
+ if (mutator.getBatchSize() >= 0) {
+ logAttrMutation("batchSizeMB", mutator.getWriteOnlyFileRolloverInterval());
+ setBatchSize(mutator.getBatchSize());
+ }
+ if (mutator.getBatchInterval() >= 0) {
+ logAttrMutation("batchTimeInterval", mutator.getWriteOnlyFileRolloverInterval());
+ setBatchInterval(mutator.getBatchInterval());
+ }
+ }
+
+ void logAttrMutation(String name, Object value) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Alter " + name + ":" + value, logPrefix);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+ @Override
+ public HDFSStoreFactory setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ @Override
+ public String getNameNodeURL() {
+ return namenodeURL;
+ }
+ @Override
+ public HDFSStoreFactory setNameNodeURL(String namenodeURL) {
+ this.namenodeURL = namenodeURL;
+ return this;
+ }
+
+ @Override
+ public String getHomeDir() {
+ return homeDir;
+ }
+ @Override
+ public HDFSStoreFactory setHomeDir(String homeDir) {
+ this.homeDir = homeDir;
+ return this;
+ }
+
+ @Override
+ public String getHDFSClientConfigFile() {
+ return clientConfigFile;
+ }
+ @Override
+ public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) {
+ this.clientConfigFile = clientConfigFile;
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setBlockCacheSize(float percentage) {
+ if(percentage < 0 || percentage > 100) {
+ throw new IllegalArgumentException("Block cache size must be between 0 and 100, inclusive");
+ }
+ this.blockCacheSize = percentage;
+ return this;
+ }
+
+ @Override
+ public float getBlockCacheSize() {
+ return blockCacheSize;
+ }
+
+ @Override
+ public HDFSStoreFactory setWriteOnlyFileRolloverSize(int maxFileSize) {
+ assertIsPositive(CacheXml.HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL, maxFileSize);
+ this.maxFileSize = maxFileSize;
+ return this;
+ }
+ @Override
+ public int getWriteOnlyFileRolloverSize() {
+ return maxFileSize;
+ }
+
+ @Override
+ public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) {
+ assertIsPositive(CacheXml.HDFS_TIME_FOR_FILE_ROLLOVER, count);
+ this.fileRolloverInterval = count;
+ return this;
+ }
+ @Override
+ public int getWriteOnlyFileRolloverInterval() {
+ return fileRolloverInterval;
+ }
+
+ @Override
+ public boolean getMinorCompaction() {
+ return isAutoCompact;
+ }
+ @Override
+ public HDFSStoreFactory setMinorCompaction(boolean auto) {
+ this.isAutoCompact = auto;
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setMinorCompactionThreads(int count) {
+ assertIsPositive(CacheXml.HDFS_MINOR_COMPACTION_THREADS, count);
+ this.maxConcurrency = count;
+ return this;
+ }
+ @Override
+ public int getMinorCompactionThreads() {
+ return maxConcurrency;
+ }
+
+ @Override
+ public HDFSStoreFactory setMajorCompaction(boolean auto) {
+ this.autoMajorCompact = auto;
+ return this;
+ }
+ @Override
+ public boolean getMajorCompaction() {
+ return autoMajorCompact;
+ }
+
+ @Override
+ public HDFSStoreFactory setMajorCompactionInterval(int count) {
+ HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_INTERVAL, count);
+ this.majorCompactionIntervalMins = count;
+ return this;
+ }
+ @Override
+ public int getMajorCompactionInterval() {
+ return majorCompactionIntervalMins;
+ }
+
+ @Override
+ public HDFSStoreFactory setMajorCompactionThreads(int count) {
+ HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_THREADS, count);
+ this.majorCompactionConcurrency = count;
+ return this;
+ }
+ @Override
+ public int getMajorCompactionThreads() {
+ return majorCompactionConcurrency;
+ }
+
+ @Override
+ public HDFSStoreFactory setInputFileSizeMax(int size) {
+ HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_SIZE_MB", size);
+ this.maxInputFileSizeMB = size;
+ return this;
+ }
+ @Override
+ public int getInputFileSizeMax() {
+ return maxInputFileSizeMB;
+ }
+
+ @Override
+ public HDFSStoreFactory setInputFileCountMin(int count) {
+ HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MIN_INPUT_FILE_COUNT", count);
+ this.minInputFileCount = count;
+ return this;
+ }
+ @Override
+ public int getInputFileCountMin() {
+ return minInputFileCount;
+ }
+
+ @Override
+ public HDFSStoreFactory setInputFileCountMax(int count) {
+ HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_COUNT", count);
+ this.maxInputFileCount = count;
+ return this;
+ }
+ @Override
+ public int getInputFileCountMax() {
+ return maxInputFileCount;
+ }
+
+ @Override
+ public int getPurgeInterval() {
+ return oldFileCleanupIntervalMins ;
+ }
+ @Override
+ public HDFSStoreFactory setPurgeInterval(int interval) {
+ assertIsPositive(CacheXml.HDFS_PURGE_INTERVAL, interval);
+ this.oldFileCleanupIntervalMins = interval;
+ return this;
+ }
+
+ protected void validate() {
+ if (minInputFileCount > maxInputFileCount) {
+ throw new IllegalArgumentException(
+ LocalizedStrings.HOPLOG_MIN_IS_MORE_THAN_MAX
+ .toLocalizedString(new Object[] {
+ "HDFS_COMPACTION_MIN_INPUT_FILE_COUNT",
+ minInputFileCount,
+ "HDFS_COMPACTION_MAX_INPUT_FILE_COUNT",
+ maxInputFileCount }));
+ }
+ }
+
+ /**
+ * This method should not be called on this class.
+ * @see HDFSStoreFactory#create(String)
+ */
+ @Override
+ public HDFSStore create(String name) throws GemFireConfigException,
+ StoreExistsException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * This method should not be called on this class.
+ * @see HDFSStoreImpl#destroy()
+ */
+ @Override
+ public void destroy() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static void assertIsPositive(String name, int count) {
+ if (count < 1) {
+ throw new IllegalArgumentException(
+ LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
+ .toLocalizedString(new Object[] { name, count }));
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("HDFSStoreConfigHolder@");
+ builder.append(System.identityHashCode(this));
+ builder.append(" [");
+ appendStrProp(builder, name, "name");
+ appendStrProp(builder, namenodeURL, "namenodeURL");
+ appendStrProp(builder, homeDir, "homeDir");
+ appendStrProp(builder, clientConfigFile, "clientConfigFile");
+ if (blockCacheSize > -1) {
+ builder.append("blockCacheSize=");
+ builder.append(blockCacheSize);
+ builder.append(", ");
+ }
+ appendIntProp(builder, maxFileSize, "maxFileSize");
+ appendIntProp(builder, fileRolloverInterval, "fileRolloverInterval");
+ appendBoolProp(builder, isAutoCompact, "isAutoCompact");
+ appendBoolProp(builder, autoMajorCompact, "autoMajorCompact");
+ appendIntProp(builder, maxConcurrency, "maxConcurrency");
+ appendIntProp(builder, majorCompactionConcurrency, "majorCompactionConcurrency");
+ appendIntProp(builder, majorCompactionIntervalMins, "majorCompactionIntervalMins");
+ appendIntProp(builder, maxInputFileSizeMB, "maxInputFileSizeMB");
+ appendIntProp(builder, maxInputFileCount, "maxInputFileCount");
+ appendIntProp(builder, minInputFileCount, "minInputFileCount");
+ appendIntProp(builder, oldFileCleanupIntervalMins, "oldFileCleanupIntervalMins");
+ appendIntProp(builder, batchSize, "batchSize");
+ appendIntProp(builder, batchIntervalMillis, "batchInterval");
+ appendIntProp(builder, maximumQueueMemory, "maximumQueueMemory");
+ appendIntProp(builder, dispatcherThreads, "dispatcherThreads");
+ appendBoolProp(builder, isPersistenceEnabled, "isPersistenceEnabled");
+ appendStrProp(builder, diskStoreName, "diskStoreName");
+ appendBoolProp(builder, diskSynchronous, "diskSynchronous");
+
+ builder.append("]");
+ return builder.toString();
+ }
+
+ private void appendStrProp(StringBuilder builder, String value, String name) {
+ if (value != null) {
+ builder.append(name + "=");
+ builder.append(value);
+ builder.append(", ");
+ }
+ }
+
+ private void appendIntProp(StringBuilder builder, int value, String name) {
+ if (value > -1) {
+ builder.append(name + "=");
+ builder.append(value);
+ builder.append(", ");
+ }
+ }
+
+ private void appendBoolProp(StringBuilder builder, boolean value, String name) {
+ builder.append(name + "=");
+ builder.append(value);
+ builder.append(", ");
+ }
+
+ @Override
+ public HDFSStoreMutator createHdfsStoreMutator() {
+ // as part of alter execution, hdfs store will replace the config holder
+ // completely. Hence mutator at the config holder is not needed
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public HDFSStore alter(HDFSStoreMutator mutator) {
+ // as part of alter execution, hdfs store will replace the config holder
+ // completely. Hence mutator at the config holder is not needed
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getDiskStoreName() {
+ return this.diskStoreName;
+ }
+ @Override
+ public HDFSStoreFactory setDiskStoreName(String name) {
+ this.diskStoreName = name;
+ return this;
+ }
+
+ @Override
+ public int getBatchInterval() {
+ return this.batchIntervalMillis;
+ }
+ @Override
+ public HDFSStoreFactory setBatchInterval(int intervalMillis){
+ this.batchIntervalMillis = intervalMillis;
+ return this;
+ }
+
+ @Override
+ public boolean getBufferPersistent() {
+ return isPersistenceEnabled;
+ }
+ @Override
+ public HDFSStoreFactory setBufferPersistent(boolean isPersistent) {
+ this.isPersistenceEnabled = isPersistent;
+ return this;
+ }
+
+ @Override
+ public int getDispatcherThreads() {
+ return dispatcherThreads;
+ }
+ @Override
+ public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) {
+ this.dispatcherThreads = dispatcherThreads;
+ return this;
+ }
+
+ @Override
+ public int getMaxMemory() {
+ return this.maximumQueueMemory;
+ }
+ @Override
+ public HDFSStoreFactory setMaxMemory(int memory) {
+ this.maximumQueueMemory = memory;
+ return this;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return this.batchSize;
+ }
+ @Override
+ public HDFSStoreFactory setBatchSize(int size){
+ this.batchSize = size;
+ return this;
+ }
+
+ @Override
+ public boolean getSynchronousDiskWrite() {
+ return this.diskSynchronous;
+ }
+ @Override
+ public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) {
+ this.diskSynchronous = isSynchronous;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
new file mode 100644
index 0000000..9ecc5e3
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ */
+public class HDFSStoreCreation implements HDFSStoreFactory {
+ protected HDFSStoreConfigHolder configHolder;
+
+ public HDFSStoreCreation() {
+ this(null);
+ }
+
+ /**
+ * Copy constructor for HDFSStoreCreation
+ * @param config configuration source for creating this instance
+ */
+ public HDFSStoreCreation(HDFSStoreCreation config) {
+ this.configHolder = new HDFSStoreConfigHolder(config == null ? null : config.configHolder);
+ }
+
+ @Override
+ public HDFSStoreFactory setName(String name) {
+ configHolder.setName(name);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setNameNodeURL(String namenodeURL) {
+ configHolder.setNameNodeURL(namenodeURL);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setHomeDir(String homeDir) {
+ configHolder.setHomeDir(homeDir);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) {
+ configHolder.setHDFSClientConfigFile(clientConfigFile);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setBlockCacheSize(float percentage) {
+ configHolder.setBlockCacheSize(percentage);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setWriteOnlyFileRolloverSize(int maxFileSize) {
+ configHolder.setWriteOnlyFileRolloverSize(maxFileSize);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) {
+ configHolder.setWriteOnlyFileRolloverInterval(count);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setMinorCompaction(boolean auto) {
+ configHolder.setMinorCompaction(auto);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setMinorCompactionThreads(int count) {
+ configHolder.setMinorCompactionThreads(count);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setMajorCompaction(boolean auto) {
+ configHolder.setMajorCompaction(auto);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setMajorCompactionInterval(int count) {
+ configHolder.setMajorCompactionInterval(count);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setMajorCompactionThreads(int count) {
+ configHolder.setMajorCompactionThreads(count);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setInputFileSizeMax(int size) {
+ configHolder.setInputFileSizeMax(size);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setInputFileCountMin(int count) {
+ configHolder.setInputFileCountMin(count);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setInputFileCountMax(int count) {
+ configHolder.setInputFileCountMax(count);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setPurgeInterval(int interval) {
+ configHolder.setPurgeInterval(interval);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setDiskStoreName(String name) {
+ configHolder.setDiskStoreName(name);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setMaxMemory(int memory) {
+ configHolder.setMaxMemory(memory);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setBatchInterval(int intervalMillis) {
+ configHolder.setBatchInterval(intervalMillis);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setBatchSize(int size) {
+ configHolder.setBatchSize(size);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setBufferPersistent(boolean isPersistent) {
+ configHolder.setBufferPersistent(isPersistent);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) {
+ configHolder.setSynchronousDiskWrite(isSynchronous);
+ return this;
+ }
+
+ @Override
+ public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) {
+ configHolder.setDispatcherThreads(dispatcherThreads);
+ return this;
+ }
+
+ /**
+ * This method should not be called on this class.
+ * @see HDFSStoreFactory#create(String)
+ */
+ @Override
+ public HDFSStore create(String name) throws GemFireConfigException,
+ StoreExistsException {
+ throw new UnsupportedOperationException();
+ }
+
+ public static void assertIsPositive(String name, int count) {
+ if (count < 1) {
+ throw new IllegalArgumentException(
+ LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
+ .toLocalizedString(new Object[] { name, count }));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
new file mode 100644
index 0000000..749f01c
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+
+/**
+ * Implementation of HDFSStoreFactory
+ *
+ */
+public class HDFSStoreFactoryImpl extends HDFSStoreCreation {
+ public static final String DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS= "HDFS_QUEUE";
+
+ private Cache cache;
+
+ public HDFSStoreFactoryImpl(Cache cache) {
+ this(cache, null);
+ }
+
+ public HDFSStoreFactoryImpl(Cache cache, HDFSStoreCreation config) {
+ super(config);
+ this.cache = cache;
+ }
+
+ @Override
+ public HDFSStore create(String name) {
+ if (name == null) {
+ throw new GemFireConfigException("HDFS store name not provided");
+ }
+
+ this.configHolder.validate();
+
+ HDFSStore result = null;
+ synchronized (this) {
+ if (this.cache instanceof GemFireCacheImpl) {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache;
+ if (gfc.findHDFSStore(name) != null) {
+ throw new StoreExistsException(name);
+ }
+
+ HDFSStoreImpl hsi = new HDFSStoreImpl(name, this.configHolder);
+ gfc.addHDFSStore(hsi);
+ result = hsi;
+ }
+ }
+ return result;
+ }
+
+ public static final String getEventQueueName(String regionPath) {
+ return HDFSStoreFactoryImpl.DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS + "_"
+ + regionPath.replace('/', '_');
+ }
+
+ public HDFSStore getConfigView() {
+ return (HDFSStore) configHolder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
new file mode 100644
index 0000000..b5d56b6
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.ConnectTimeoutException;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.util.SingletonCallable;
+import com.gemstone.gemfire.internal.util.SingletonValue;
+import com.gemstone.gemfire.internal.util.SingletonValue.SingletonBuilder;
+
+/**
+ * Represents a HDFS based persistent store for region data.
+ *
+ */
+public class HDFSStoreImpl implements HDFSStore {
+
+ private volatile HDFSStoreConfigHolder configHolder;
+
+ private final SingletonValue<FileSystem> fs;
+
+ /**
+ * Used to make sure that only one thread creates the writer at a time. This prevents the dispatcher
+ * threads from cascading the Connection lock in DFS client see bug 51195
+ */
+ private final SingletonCallable<HoplogWriter> singletonWriter = new SingletonCallable<HoplogWriter>();
+
+ private final HFileStoreStatistics stats;
+ private final BlockCache blockCache;
+
+ private static HashSet<String> secureNameNodes = new HashSet<String>();
+
+ private final boolean PERFORM_SECURE_HDFS_CHECK = Boolean.getBoolean(HoplogConfig.PERFORM_SECURE_HDFS_CHECK_PROP);
+ private static final Logger logger = LogService.getLogger();
+ protected final String logPrefix;
+
+ static {
+ HdfsConfiguration.init();
+ }
+
+ public HDFSStoreImpl(String name, final HDFSStore config) {
+ this.configHolder = new HDFSStoreConfigHolder(config);
+ configHolder.setName(name);
+
+ this.logPrefix = "<" + "HdfsStore:" + name + "> ";
+
+ stats = new HFileStoreStatistics(InternalDistributedSystem.getAnyInstance(), "HDFSStoreStatistics", name);
+
+ final Configuration hconf = new Configuration();
+
+ // Set the block cache size.
+ // Disable the static block cache. We keep our own cache on the HDFS Store
+ // hconf.setFloat("hfile.block.cache.size", 0f);
+ if (this.getBlockCacheSize() != 0) {
+ long cacheSize = (long) (HeapMemoryMonitor.getTenuredPoolMaxMemory() * this.getBlockCacheSize() / 100);
+
+ // TODO use an off heap block cache if we're using off heap memory?
+ // See CacheConfig.instantiateBlockCache.
+ // According to Anthony, the off heap block cache is still
+ // experimental. Our own off heap cache might be a better bet.
+// this.blockCache = new LruBlockCache(cacheSize,
+// StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf, HFileSortedOplogFactory.convertStatistics(stats));
+ this.blockCache = new LruBlockCache(cacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf);
+ } else {
+ this.blockCache = null;
+ }
+
+ final String clientFile = config.getHDFSClientConfigFile();
+ fs = new SingletonValue<FileSystem>(new SingletonBuilder<FileSystem>() {
+ @Override
+ public FileSystem create() throws IOException {
+ return createFileSystem(hconf, clientFile, false);
+ }
+
+ @Override
+ public void postCreate() {
+ }
+
+ @Override
+ public void createInProgress() {
+ }
+ });
+
+ FileSystem fileSystem = null;
+ try {
+ fileSystem = fs.get();
+ } catch (Throwable ex) {
+ throw new HDFSIOException(ex.getMessage(),ex);
+ }
+ //HDFSCompactionConfig has already been initialized
+ long cleanUpIntervalMillis = getPurgeInterval() * 60 * 1000;
+ Path cleanUpIntervalPath = new Path(getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
+ HoplogUtil.exposeCleanupIntervalMillis(fileSystem, cleanUpIntervalPath, cleanUpIntervalMillis);
+ }
+
+ /**
+ * Creates a new file system every time.
+ */
+ public FileSystem createFileSystem() {
+ Configuration hconf = new Configuration();
+ try {
+ return createFileSystem(hconf, this.getHDFSClientConfigFile(), true);
+ } catch (Throwable ex) {
+ throw new HDFSIOException(ex.getMessage(),ex);
+ }
+ }
+
+ private FileSystem createFileSystem(Configuration hconf, String configFile, boolean forceNew) throws IOException {
+ FileSystem filesystem = null;
+
+ // load hdfs client config file if specified. The path is on local file
+ // system
+ if (configFile != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Adding resource config file to hdfs configuration:" + configFile, logPrefix);
+ }
+ hconf.addResource(new Path(configFile));
+
+ if (! new File(configFile).exists()) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_CLIENT_CONFIG_FILE_ABSENT, configFile));
+ }
+ }
+
+ // This setting disables shutdown hook for file system object. Shutdown
+ // hook may cause FS object to close before the cache or store and
+ // unpredictable behavior. This setting is provided for GFXD like server
+ // use cases where FS close is managed by a server. This setting is not
+ // supported by old versions of hadoop, HADOOP-4829
+ hconf.setBoolean("fs.automatic.close", false);
+
+ // Hadoop has a configuration parameter io.serializations that is a list of serialization
+ // classes which can be used for obtaining serializers and deserializers. This parameter
+ // by default contains avro classes. When a sequence file is created, it calls
+ // SerializationFactory.getSerializer(keyclass). This internally creates objects using
+ // reflection of all the classes that were part of io.serializations. But since, there is
+ // no avro class available it throws an exception.
+ // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes
+ // that are important to us.
+ hconf.setStrings("io.serializations",
+ new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"});
+ // create writer
+
+ SchemaMetrics.configureGlobally(hconf);
+
+ String nameNodeURL = null;
+ if ((nameNodeURL = getNameNodeURL()) == null) {
+ nameNodeURL = hconf.get("fs.default.name");
+ }
+
+ URI namenodeURI = URI.create(nameNodeURL);
+
+ //if (! GemFireCacheImpl.getExisting().isHadoopGfxdLonerMode()) {
+ String authType = hconf.get("hadoop.security.authentication");
+
+ //The following code handles Gemfire XD with secure HDFS
+ //A static set is used to cache all known secure HDFS NameNode urls.
+ UserGroupInformation.setConfiguration(hconf);
+
+ //Compare authentication method ignoring case to make GFXD future version complaint
+ //At least version 2.0.2 starts complaining if the string "kerberos" is not in all small case.
+ //However it seems current version of hadoop accept the authType in any case
+ if (authType.equalsIgnoreCase("kerberos")) {
+
+ String principal = hconf.get(HoplogConfig.KERBEROS_PRINCIPAL);
+ String keyTab = hconf.get(HoplogConfig.KERBEROS_KEYTAB_FILE);
+
+ if (!PERFORM_SECURE_HDFS_CHECK) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Ignore secure hdfs check", logPrefix);
+ } else {
+ if (!secureNameNodes.contains(nameNodeURL)) {
+ if (logger.isDebugEnabled())
+ logger.debug("{}Executing secure hdfs check", logPrefix);
+ try{
+ filesystem = FileSystem.newInstance(namenodeURI, hconf);
+ //Make sure no IOExceptions are generated when accessing insecure HDFS.
+ filesystem.listFiles(new Path("/"),false);
+ throw new HDFSIOException("Gemfire XD HDFS client and HDFS cluster security levels do not match. The configured HDFS Namenode is not secured.");
+ } catch (IOException ex) {
+ secureNameNodes.add(nameNodeURL);
+ } finally {
+ //Close filesystem to avoid resource leak
+ if(filesystem != null) {
+ closeFileSystemIgnoreError(filesystem);
+ }
+ }
+ }
+ }
+
+ // check to ensure the namenode principal is defined
+ String nameNodePrincipal = hconf.get("dfs.namenode.kerberos.principal");
+ if (nameNodePrincipal == null) {
+ throw new IOException(LocalizedStrings.GF_KERBEROS_NAMENODE_PRINCIPAL_UNDEF.toLocalizedString());
+ }
+
+ // ok, the user specified a gfxd principal so we will try to login
+ if (principal != null) {
+ //If NameNode principal is the same as Gemfire XD principal, there is a
+ //potential security hole
+ String regex = "[/@]";
+ if (nameNodePrincipal != null) {
+ String HDFSUser = nameNodePrincipal.split(regex)[0];
+ String GFXDUser = principal.split(regex)[0];
+ if (HDFSUser.equals(GFXDUser)) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.HDFS_USER_IS_SAME_AS_GF_USER, GFXDUser));
+ }
+ }
+
+ // a keytab must exist if the user specifies a principal
+ if (keyTab == null) {
+ throw new IOException(LocalizedStrings.GF_KERBEROS_KEYTAB_UNDEF.toLocalizedString());
+ }
+
+ // the keytab must exist as well
+ File f = new File(keyTab);
+ if (!f.exists()) {
+ throw new FileNotFoundException(LocalizedStrings.GF_KERBEROS_KEYTAB_FILE_ABSENT.toLocalizedString(f.getAbsolutePath()));
+ }
+
+ //Authenticate Gemfire XD principal to Kerberos KDC using Gemfire XD keytab file
+ String principalWithValidHost = SecurityUtil.getServerPrincipal(principal, "");
+ UserGroupInformation.loginUserFromKeytab(principalWithValidHost, keyTab);
+ } else {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.GF_KERBEROS_PRINCIPAL_UNDEF));
+ }
+ }
+ //}
+
+ filesystem = getFileSystemFactory().create(namenodeURI, hconf, forceNew);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Initialized FileSystem linked to " + filesystem.getUri()
+ + " " + filesystem.hashCode(), logPrefix);
+ }
+ return filesystem;
+ }
+
+ public FileSystem getFileSystem() throws IOException {
+ return fs.get();
+ }
+
+ public FileSystem getCachedFileSystem() {
+ return fs.getCachedValue();
+ }
+
+ public SingletonCallable<HoplogWriter> getSingletonWriter() {
+ return this.singletonWriter;
+ }
+
+ private final SingletonCallable<Boolean> fsExists = new SingletonCallable<Boolean>();
+
+ public boolean checkFileSystemExists() throws IOException {
+ try {
+ return fsExists.runSerially(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ FileSystem fileSystem = getCachedFileSystem();
+ if (fileSystem == null) {
+ return false;
+ }
+ return fileSystem.exists(new Path("/"));
+ }
+ });
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ throw (IOException)e;
+ }
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * This method executes a query on namenode. If the query succeeds, FS
+ * instance is healthy. If it fails, the old instance is closed and a new
+ * instance is created.
+ */
+ public void checkAndClearFileSystem() {
+ FileSystem fileSystem = getCachedFileSystem();
+
+ if (fileSystem != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Checking file system at " + fileSystem.getUri(), logPrefix);
+ }
+ try {
+ checkFileSystemExists();
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}FS client is ok: " + fileSystem.getUri() + " "
+ + fileSystem.hashCode(), logPrefix);
+ }
+ return;
+ } catch (ConnectTimeoutException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Hdfs unreachable, FS client is ok: "
+ + fileSystem.getUri() + " " + fileSystem.hashCode(), logPrefix);
+ }
+ return;
+ } catch (IOException e) {
+ logger.debug("IOError in filesystem checkAndClear ", e);
+
+ // The file system is closed or NN is not reachable. It is safest to
+ // create a new FS instance. If the NN continues to remain unavailable,
+ // all subsequent read/write request will cause HDFSIOException. This is
+ // similar to the way hbase manages failures. This has a drawback
+ // though. A network blip will result in all connections to be
+ // recreated. However trying to preserve the connections and waiting for
+ // FS to auto-recover is not deterministic.
+ if (e instanceof RemoteException) {
+ e = ((RemoteException) e).unwrapRemoteException();
+ }
+
+ logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_UNREACHABLE,
+ fileSystem.getUri()), e);
+ }
+
+ // compare and clear FS container. The fs container needs to be reusable
+ boolean result = fs.clear(fileSystem, true);
+ if (!result) {
+ // the FS instance changed after this call was initiated. Check again
+ logger.debug("{}Failed to clear FS ! I am inconsistent so retrying ..", logPrefix);
+ checkAndClearFileSystem();
+ } else {
+ closeFileSystemIgnoreError(fileSystem);
+ }
+ }
+ }
+
+ private void closeFileSystemIgnoreError(FileSystem fileSystem) {
+ if (fileSystem == null) {
+ logger.debug("{}Trying to close null file system", logPrefix);
+ return;
+ }
+
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Closing file system at " + fileSystem.getUri() + " "
+ + fileSystem.hashCode(), logPrefix);
+ }
+ fileSystem.close();
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to close file system at " + fileSystem.getUri()
+ + " " + fileSystem.hashCode(), e);
+ }
+ }
+ }
+
+ public HFileStoreStatistics getStats() {
+ return stats;
+ }
+
+ public BlockCache getBlockCache() {
+ return blockCache;
+ }
+
+ public void close() {
+ logger.debug("{}Closing file system: " + getName(), logPrefix);
+ stats.close();
+ blockCache.shutdown();
+ //Might want to clear the block cache, but it should be dereferenced.
+
+ // release DDL hoplog organizer for this store. Also shutdown compaction
+ // threads. These two resources hold references to GemfireCacheImpl
+ // instance. Any error is releasing this resources is not critical and needs
+ // be ignored.
+ try {
+ HDFSCompactionManager manager = HDFSCompactionManager.getInstance(this);
+ if (manager != null) {
+ manager.reset();
+ }
+ } catch (Exception e) {
+ logger.info(e);
+ }
+
+ // once this store is closed, this store should not be used again
+ FileSystem fileSystem = fs.clear(false);
+ if (fileSystem != null) {
+ closeFileSystemIgnoreError(fileSystem);
+ }
+ }
+
+ /**
+ * Test hook to remove all of the contents of the the folder
+ * for this HDFS store from HDFS.
+ * @throws IOException
+ */
+ public void clearFolder() throws IOException {
+ getFileSystem().delete(new Path(getHomeDir()), true);
+ }
+
+ @Override
+ public void destroy() {
+ Collection<String> regions = HDFSRegionDirector.getInstance().getRegionsInStore(this);
+ if(!regions.isEmpty()) {
+ throw new IllegalStateException("Cannot destroy a HDFS store that still contains regions: " + regions);
+ }
+ close();
+ HDFSStoreDirector.getInstance().removeHDFSStore(this.getName());
+ }
+
+ @Override
+ public synchronized HDFSStore alter(HDFSStoreMutator mutator) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Altering hdfsStore " + this, logPrefix);
+ logger.debug("{}Mutator " + mutator, logPrefix);
+ }
+ HDFSStoreConfigHolder newHolder = new HDFSStoreConfigHolder(configHolder);
+ newHolder.copyFrom(mutator);
+ newHolder.validate();
+ HDFSStore oldStore = configHolder;
+ configHolder = newHolder;
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}Resuult of Alter " + this, logPrefix);
+ }
+ return (HDFSStore) oldStore;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("HDFSStoreImpl [");
+ if (configHolder != null) {
+ builder.append("configHolder=");
+ builder.append(configHolder);
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+ @Override
+ public String getName() {
+ return configHolder.getName();
+ }
+
+ @Override
+ public String getNameNodeURL() {
+ return configHolder.getNameNodeURL();
+ }
+
+ @Override
+ public String getHomeDir() {
+ return configHolder.getHomeDir();
+ }
+
+ @Override
+ public String getHDFSClientConfigFile() {
+ return configHolder.getHDFSClientConfigFile();
+ }
+
+ @Override
+ public float getBlockCacheSize() {
+ return configHolder.getBlockCacheSize();
+ }
+
+ @Override
+ public int getWriteOnlyFileRolloverSize() {
+ return configHolder.getWriteOnlyFileRolloverSize();
+ }
+
+ @Override
+ public int getWriteOnlyFileRolloverInterval() {
+ return configHolder.getWriteOnlyFileRolloverInterval();
+ }
+
+ @Override
+ public boolean getMinorCompaction() {
+ return configHolder.getMinorCompaction();
+ }
+
+ @Override
+ public int getMinorCompactionThreads() {
+ return configHolder.getMinorCompactionThreads();
+ }
+
+ @Override
+ public boolean getMajorCompaction() {
+ return configHolder.getMajorCompaction();
+ }
+
+ @Override
+ public int getMajorCompactionInterval() {
+ return configHolder.getMajorCompactionInterval();
+ }
+
+ @Override
+ public int getMajorCompactionThreads() {
+ return configHolder.getMajorCompactionThreads();
+ }
+
+
+ @Override
+ public int getInputFileSizeMax() {
+ return configHolder.getInputFileSizeMax();
+ }
+
+ @Override
+ public int getInputFileCountMin() {
+ return configHolder.getInputFileCountMin();
+ }
+
+ @Override
+ public int getInputFileCountMax() {
+ return configHolder.getInputFileCountMax();
+ }
+
+ @Override
+ public int getPurgeInterval() {
+ return configHolder.getPurgeInterval();
+ }
+
+ @Override
+ public String getDiskStoreName() {
+ return configHolder.getDiskStoreName();
+ }
+
+ @Override
+ public int getMaxMemory() {
+ return configHolder.getMaxMemory();
+ }
+
+ @Override
+ public int getBatchSize() {
+ return configHolder.getBatchSize();
+ }
+
+ @Override
+ public int getBatchInterval() {
+ return configHolder.getBatchInterval();
+ }
+
+ @Override
+ public boolean getBufferPersistent() {
+ return configHolder.getBufferPersistent();
+ }
+
+ @Override
+ public boolean getSynchronousDiskWrite() {
+ return configHolder.getSynchronousDiskWrite();
+ }
+
+ @Override
+ public int getDispatcherThreads() {
+ return configHolder.getDispatcherThreads();
+ }
+
+ @Override
+ public HDFSStoreMutator createHdfsStoreMutator() {
+ return new HDFSStoreMutatorImpl();
+ }
+
+ public FileSystemFactory getFileSystemFactory() {
+ return new DistributedFileSystemFactory();
+ }
+
+ /*
+ * Factory to create HDFS file system instances
+ */
+ static public interface FileSystemFactory {
+ public FileSystem create(URI namenode, Configuration conf, boolean forceNew) throws IOException;
+ }
+
+ /*
+ * File system factory implementations for creating instances of file system
+ * connected to distributed HDFS cluster
+ */
+ public class DistributedFileSystemFactory implements FileSystemFactory {
+ private final boolean ALLOW_TEST_FILE_SYSTEM = Boolean.getBoolean(HoplogConfig.ALLOW_LOCAL_HDFS_PROP);
+ private final boolean USE_FS_CACHE = Boolean.getBoolean(HoplogConfig.USE_FS_CACHE);
+
+ @Override
+ public FileSystem create(URI nn, Configuration conf, boolean create) throws IOException {
+ FileSystem filesystem;
+
+ if (USE_FS_CACHE && !create) {
+ filesystem = FileSystem.get(nn, conf);
+ } else {
+ filesystem = FileSystem.newInstance(nn, conf);
+ }
+
+ if (filesystem instanceof LocalFileSystem && !ALLOW_TEST_FILE_SYSTEM) {
+ closeFileSystemIgnoreError(filesystem);
+ throw new IllegalStateException(
+ LocalizedStrings.HOPLOG_TRYING_TO_CREATE_STANDALONE_SYSTEM.toLocalizedString(getNameNodeURL()));
+ }
+
+ return filesystem;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
new file mode 100644
index 0000000..203e623
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+public class HDFSStoreMutatorImpl implements HDFSStoreMutator {
+ private HDFSStoreConfigHolder configHolder;
+ private Boolean autoCompact;
+ private Boolean autoMajorCompact;
+
+ public HDFSStoreMutatorImpl() {
+ configHolder = new HDFSStoreConfigHolder();
+ configHolder.resetDefaultValues();
+ }
+
+ public HDFSStoreMutatorImpl(HDFSStore store) {
+ configHolder = new HDFSStoreConfigHolder(store);
+ }
+
+ public HDFSStoreMutator setWriteOnlyFileRolloverSize(int maxFileSize) {
+ configHolder.setWriteOnlyFileRolloverSize(maxFileSize);
+ return this;
+ }
+ @Override
+ public int getWriteOnlyFileRolloverSize() {
+ return configHolder.getWriteOnlyFileRolloverSize();
+ }
+
+ @Override
+ public HDFSStoreMutator setWriteOnlyFileRolloverInterval(int count) {
+ configHolder.setWriteOnlyFileRolloverInterval(count);
+ return this;
+ }
+ @Override
+ public int getWriteOnlyFileRolloverInterval() {
+ return configHolder.getWriteOnlyFileRolloverInterval();
+ }
+
+ @Override
+ public HDFSStoreMutator setMinorCompaction(boolean auto) {
+ autoCompact = Boolean.valueOf(auto);
+ configHolder.setMinorCompaction(auto);
+ return null;
+ }
+ @Override
+ public Boolean getMinorCompaction() {
+ return autoCompact;
+ }
+
+ @Override
+ public HDFSStoreMutator setMinorCompactionThreads(int count) {
+ configHolder.setMinorCompactionThreads(count);
+ return this;
+ }
+ @Override
+ public int getMinorCompactionThreads() {
+ return configHolder.getMinorCompactionThreads();
+ }
+
+ @Override
+ public HDFSStoreMutator setMajorCompaction(boolean auto) {
+ autoMajorCompact = Boolean.valueOf(auto);
+ configHolder.setMajorCompaction(auto);
+ return this;
+ }
+ @Override
+ public Boolean getMajorCompaction() {
+ return autoMajorCompact;
+ }
+
+ @Override
+ public HDFSStoreMutator setMajorCompactionInterval(int count) {
+ configHolder.setMajorCompactionInterval(count);
+ return this;
+ }
+ @Override
+ public int getMajorCompactionInterval() {
+ return configHolder.getMajorCompactionInterval();
+ }
+
+ @Override
+ public HDFSStoreMutator setMajorCompactionThreads(int count) {
+ configHolder.setMajorCompactionThreads(count);
+ return this;
+ }
+ @Override
+ public int getMajorCompactionThreads() {
+ return configHolder.getMajorCompactionThreads();
+ }
+
+ @Override
+ public HDFSStoreMutator setInputFileSizeMax(int size) {
+ configHolder.setInputFileSizeMax(size);
+ return this;
+ }
+ @Override
+ public int getInputFileSizeMax() {
+ return configHolder.getInputFileSizeMax();
+ }
+
+ @Override
+ public HDFSStoreMutator setInputFileCountMin(int count) {
+ configHolder.setInputFileCountMin(count);
+ return this;
+ }
+ @Override
+ public int getInputFileCountMin() {
+ return configHolder.getInputFileCountMin();
+ }
+
+ @Override
+ public HDFSStoreMutator setInputFileCountMax(int count) {
+ configHolder.setInputFileCountMax(count);
+ return this;
+ }
+ @Override
+ public int getInputFileCountMax() {
+ return configHolder.getInputFileCountMax();
+ }
+
+ @Override
+ public HDFSStoreMutator setPurgeInterval(int interval) {
+ configHolder.setPurgeInterval(interval);
+ return this;
+ }
+ @Override
+ public int getPurgeInterval() {
+ return configHolder.getPurgeInterval();
+ }
+
+ @Override
+ public int getBatchSize() {
+ return configHolder.batchSize;
+ }
+ @Override
+ public HDFSStoreMutator setBatchSize(int size) {
+ configHolder.setBatchSize(size);
+ return this;
+ }
+
+
+ @Override
+ public int getBatchInterval() {
+ return configHolder.batchIntervalMillis;
+ }
+ @Override
+ public HDFSStoreMutator setBatchInterval(int interval) {
+ configHolder.setBatchInterval(interval);
+ return this;
+ }
+
+ public static void assertIsPositive(String name, int count) {
+ if (count < 1) {
+ throw new IllegalArgumentException(
+ LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
+ .toLocalizedString(new Object[] { name, count }));
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("HDFSStoreMutatorImpl [");
+ if (configHolder != null) {
+ builder.append("configHolder=");
+ builder.append(configHolder);
+ builder.append(", ");
+ }
+ if (autoCompact != null) {
+ builder.append("MinorCompaction=");
+ builder.append(autoCompact);
+ builder.append(", ");
+ }
+ if (getMajorCompaction() != null) {
+ builder.append("autoMajorCompaction=");
+ builder.append(getMajorCompaction());
+ builder.append(", ");
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
new file mode 100644
index 0000000..0298523
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * Listener that persists events to a write only HDFS store
+ *
+ */
+public class HDFSWriteOnlyStoreEventListener implements
+ AsyncEventListener {
+
+ private final LogWriterI18n logger;
+ private volatile boolean senderStopped = false;
+ private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f);
+
+
+ public HDFSWriteOnlyStoreEventListener(LogWriterI18n logger) {
+ this.logger = logger;
+ }
+
+ @Override
+ public void close() {
+ senderStopped = true;
+ }
+
+ @Override
+ public boolean processEvents(List<AsyncEvent> events) {
+ if (Hoplog.NOP_WRITE) {
+ return true;
+ }
+
+ if (logger.fineEnabled())
+ logger.fine("HDFSWriteOnlyStoreEventListener: A total of " + events.size() + " events are sent from GemFire to persist on HDFS");
+ boolean success = false;
+ try {
+ failureTracker.sleepIfRetry();
+ HDFSGatewayEventImpl hdfsEvent = null;
+ int previousBucketId = -1;
+ BatchManager bm = null;
+ for (AsyncEvent asyncEvent : events) {
+ if (senderStopped){
+ if (logger.fineEnabled()) {
+ logger.fine("HDFSWriteOnlyStoreEventListener.processEvents: Cache is closing down. Ignoring the batch of data.");
+ }
+ return false;
+ }
+ hdfsEvent = (HDFSGatewayEventImpl)asyncEvent;
+ if (previousBucketId != hdfsEvent.getBucketId()){
+ if (previousBucketId != -1)
+ persistBatch(bm, previousBucketId);
+
+ previousBucketId = hdfsEvent.getBucketId();
+ bm = new BatchManager();
+ }
+ bm.addEvent(hdfsEvent);
+ }
+ try {
+ persistBatch(bm, hdfsEvent.getBucketId());
+ } catch (BucketMovedException e) {
+ logger.fine("Batch could not be written to HDFS as the bucket moved. bucket id: " +
+ hdfsEvent.getBucketId() + " Exception: " + e);
+ return false;
+ }
+ success = true;
+ } catch (IOException e) {
+ logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
+ return false;
+ }
+ catch (ClassNotFoundException e) {
+ logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
+ return false;
+ }
+ catch (CacheClosedException e) {
+ // exit silently
+ if (logger.fineEnabled())
+ logger.fine(e);
+ return false;
+ } catch (ForceReattemptException e) {
+ if (logger.fineEnabled())
+ logger.fine(e);
+ return false;
+ } catch (InterruptedException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ } finally {
+ failureTracker.record(success);
+ }
+ return true;
+ }
+
+ /**
+ * Persists batches of multiple regions specified by the batch manager
+ *
+ */
+ private void persistBatch(BatchManager bm, int bucketId) throws IOException, ForceReattemptException {
+ Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> eventsPerRegion =
+ bm.iterator();
+ HoplogOrganizer bucketOrganizer = null;
+ while (eventsPerRegion.hasNext()) {
+ Map.Entry<LocalRegion, ArrayList<QueuedPersistentEvent>> eventsForARegion = eventsPerRegion.next();
+ bucketOrganizer = getOrganizer((PartitionedRegion) eventsForARegion.getKey(), bucketId);
+ // bucket organizer cannot be null.
+ if (bucketOrganizer == null)
+ throw new BucketMovedException("Bucket moved. BucketID: " + bucketId + " HdfsRegion: " + eventsForARegion.getKey().getName());
+ bucketOrganizer.flush(eventsForARegion.getValue().iterator(), eventsForARegion.getValue().size());
+ if (logger.fineEnabled()) {
+ logger.fine("Batch written to HDFS of size " + eventsForARegion.getValue().size() +
+ " for region " + eventsForARegion.getKey());
+ }
+ }
+ }
+
+ private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) {
+ BucketRegion br = region.getDataStore().getLocalBucketById(bucketId);
+ if (br == null) {
+ // got rebalanced or something
+ throw new BucketMovedException("Bucket region is no longer available. BucketId: "+
+ bucketId + " HdfsRegion: " + region.getName());
+ }
+
+ return br.getHoplogOrganizer();
+ }
+
+ /**
+ * Sorts out events of the multiple regions into lists per region
+ *
+ */
+ private class BatchManager implements Iterable<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> {
+ private HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>> regionBatches =
+ new HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>>();
+
+ public void addEvent (HDFSGatewayEventImpl hdfsEvent) throws IOException, ClassNotFoundException {
+ LocalRegion region = (LocalRegion) hdfsEvent.getRegion();
+ ArrayList<QueuedPersistentEvent> regionList = regionBatches.get(region);
+ if (regionList == null) {
+ regionList = new ArrayList<QueuedPersistentEvent>();
+ regionBatches.put(region, regionList);
+ }
+ regionList.add(new UnsortedHDFSQueuePersistedEvent(hdfsEvent));
+ }
+
+ @Override
+ public Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> iterator() {
+ return regionBatches.entrySet().iterator();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
new file mode 100644
index 0000000..c7ba23f
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogListener;
+
+/**
+ * Objects of this class needs to be created for every region. These objects
+ * listen to the oplog events and take appropriate action.
+ *
+ */
+public class HoplogListenerForRegion implements HoplogListener {
+
+ private List<HoplogListener> otherListeners = new CopyOnWriteArrayList<HoplogListener>();
+
+ public HoplogListenerForRegion() {
+
+ }
+
+ @Override
+ public void hoplogCreated(String regionFolder, int bucketId,
+ Hoplog... oplogs) throws IOException {
+ for (HoplogListener listener : this.otherListeners) {
+ listener.hoplogCreated(regionFolder, bucketId, oplogs);
+ }
+ }
+
+ @Override
+ public void hoplogDeleted(String regionFolder, int bucketId,
+ Hoplog... oplogs) {
+ for (HoplogListener listener : this.otherListeners) {
+ try {
+ listener.hoplogDeleted(regionFolder, bucketId, oplogs);
+ } catch (IOException e) {
+ // TODO handle
+ throw new HDFSIOException(e.getLocalizedMessage(), e);
+ }
+ }
+ }
+
+ public void addListener(HoplogListener listener) {
+ this.otherListeners.add(listener);
+ }
+
+ @Override
+ public void compactionCompleted(String region, int bucket, boolean isMajor) {
+ for (HoplogListener listener : this.otherListeners) {
+ listener.compactionCompleted(region, bucket, isMajor);
+ }
+ }
+}