You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2016/05/03 23:45:28 UTC
[14/60] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS
related code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
deleted file mode 100644
index 1e6a034..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.EntryNotFoundException;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.SystemTimer;
-import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
-import com.gemstone.gemfire.internal.cache.ColocationHelper;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
-
-/**
- * Parallel Gateway Sender Queue extended for HDFS functionality
- *
- */
-public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
-
- private int currentBucketIndex = 0;
- private int elementsPeekedAcrossBuckets = 0;
- private SystemTimer rollListTimer = null;
- public static final String ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP = "gemfire.ROLL_SORTED_LIST_TIME_INTERVAL_MS";
- private final int ROLL_SORTED_LIST_TIME_INTERVAL_MS = Integer.getInteger(ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP, 3000);
-
- public HDFSParallelGatewaySenderQueue(AbstractGatewaySender sender,
- Set<Region> userPRs, int idx, int nDispatcher) {
-
- super(sender, userPRs, idx, nDispatcher);
- //only first dispatcher Hemant?
- if (sender.getBucketSorted() && this.index == 0) {
- rollListTimer = new SystemTimer(sender.getCache().getDistributedSystem(),
- true);
- // schedule the task to roll the skip lists
- rollListTimer.scheduleAtFixedRate(new RollSortedListsTimerTask(),
- ROLL_SORTED_LIST_TIME_INTERVAL_MS, ROLL_SORTED_LIST_TIME_INTERVAL_MS);
- }
- }
-
- @Override
- public Object peek() throws InterruptedException, CacheException {
- /* If you call peek and use super.peek it leads to the following exception.
- * So I'm adding an explicit UnsupportedOperationException.
- Caused by: java.lang.ClassCastException: com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue cannot be cast to com.gemstone.gemfire.internal.cache.BucketRegionQueue
- at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.getRandomPrimaryBucket(ParallelGatewaySenderQueue.java:964)
- at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.peek(ParallelGatewaySenderQueue.java:1078)
- */
- throw new UnsupportedOperationException();
- }
-
-
- @Override
- public void cleanUp() {
- super.cleanUp();
- cancelRollListTimer();
- }
-
- private void cancelRollListTimer() {
- if (rollListTimer != null) {
- rollListTimer.cancel();
- rollListTimer = null;
- }
- }
- /**
- * A call to this function peeks elements from the first local primary bucket.
- * Next call to this function peeks elements from the next local primary
- * bucket and so on.
- */
- @Override
- public List peek(int batchSize, int timeToWait) throws InterruptedException,
- CacheException {
-
- List batch = new ArrayList();
-
- int batchSizeInBytes = batchSize*1024*1024;
- PartitionedRegion prQ = getRandomShadowPR();
- if (prQ == null || prQ.getLocalMaxMemory() == 0) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- blockProcesorThreadIfRequired();
- return batch;
- }
-
- ArrayList list = null;
- ArrayList<Integer> pbuckets = new ArrayList<Integer>(prQ
- .getDataStore().getAllLocalPrimaryBucketIds());
- ArrayList<Integer> buckets = new ArrayList<Integer>();
- for(Integer i : pbuckets) {
- if(i % this.nDispatcher == this.index)
- buckets.add(i);
- }
- // In case of failures, peekedEvents would possibly have some elements
- // add them.
- if (this.resetLastPeeked) {
- int previousBucketId = -1;
- boolean stillPrimary = true;
- Iterator<GatewaySenderEventImpl> iter = peekedEvents.iterator();
- // we need to remove the events of the bucket that are no more primary on
- // this node as they cannot be persisted from this node.
- while(iter.hasNext()) {
- HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)iter.next();
- if (previousBucketId != hdfsEvent.getBucketId()){
- stillPrimary = buckets.contains(hdfsEvent.getBucketId());
- previousBucketId = hdfsEvent.getBucketId();
- }
- if (stillPrimary)
- batch.add(hdfsEvent);
- else {
- iter.remove();
- }
- }
- this.resetLastPeeked = false;
- }
-
- if (buckets.size() == 0) {
- // Sleep a bit before trying again. provided by Dan
- try {
- Thread.sleep(50);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return batch;
- }
-
- if (this.sender.getBucketSorted()) {
-
- }
-
- // Each call to this function returns index of next bucket
- // that is to be processed. This function takes care
- // of the bucket sequence that is peeked by a sequence of
- // peek calls.
- // If there are bucket movements between two consecutive
- // calls to this function then there is chance that a bucket
- // is processed twice while another one is skipped. But, that is
- // ok because in the next round, it will be processed.
- Integer bIdIndex = getCurrentBucketIndex(buckets.size());
-
- // If we have gone through all the buckets once and no
- // elements were peeked from any of the buckets, take a nap.
- // This always sleep in the first call but that should be ok
- // because the timeToWait in practical use cases would be greater
- // than this sleep of 100 ms.
- if (bIdIndex == 0 && getAndresetElementsPeekedAcrossBuckets() == 0) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- HDFSBucketRegionQueue hrq = ((HDFSBucketRegionQueue)prQ
- .getDataStore().getLocalBucketById(buckets.get(bIdIndex)));
-
- if (hrq == null) {
- // bucket moved to another node after getAllLocalPrimaryBucketIds
- // was called. Peeking not possible. return.
- return batch;
- }
- long entriesWaitingTobePeeked = hrq.totalEntries();
-
- if (entriesWaitingTobePeeked == 0) {
- blockProcesorThreadIfRequired();
- return batch;
- }
-
- long currentTimeInMillis = System.currentTimeMillis();
- long bucketSizeInBytes = hrq.getQueueSizeInBytes();
- if (((currentTimeInMillis - hrq.getLastPeekTimeInMillis()) > timeToWait)
- || ( bucketSizeInBytes > batchSizeInBytes)
- || hrq.shouldDrainImmediately()) {
- // peek now
- if (logger.isDebugEnabled()) {
- logger.debug("Peeking queue " + hrq.getId() + ": bucketSizeInBytes " + bucketSizeInBytes
- + ": batchSizeInBytes" + batchSizeInBytes
- + ": timeToWait" + timeToWait
- + ": (currentTimeInMillis - hrq.getLastPeekTimeInMillis())" + (currentTimeInMillis - hrq.getLastPeekTimeInMillis()));
- }
-
- list = peekAhead(buckets.get(bIdIndex), hrq);
-
- if (list != null && list.size() != 0 ) {
- for (Object object : list) {
- batch.add(object);
- peekedEvents.add((HDFSGatewayEventImpl)object);
- }
- }
- }
- else {
- blockProcesorThreadIfRequired();
- }
- if (logger.isDebugEnabled() && batch.size() > 0) {
- logger.debug(this + ": Peeked a batch of " + batch.size() + " entries");
- }
-
- setElementsPeekedAcrossBuckets(batch.size());
-
- return batch;
- }
-
- /**
- * This function maintains an index of the last processed bucket.
- * When it is called, it returns index of the next bucket.
- * @param totalBuckets
- * @return current bucket index
- */
- private int getCurrentBucketIndex(int totalBuckets) {
- int retBucket = currentBucketIndex;
- if (retBucket >= totalBuckets) {
- currentBucketIndex = 0;
- retBucket = 0;
- }
-
- currentBucketIndex++;
-
- return retBucket;
- }
-
- @Override
- public void remove(int batchSize) throws CacheException {
- int destroyed = 0;
- HDFSGatewayEventImpl event = null;
-
- if (this.peekedEvents.size() > 0)
- event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
-
- while (event != null && destroyed < batchSize) {
- Region currentRegion = event.getRegion();
- int currentBucketId = event.getBucketId();
- int bucketId = event.getBucketId();
-
- ArrayList<HDFSGatewayEventImpl> listToDestroy = new ArrayList<HDFSGatewayEventImpl>();
- ArrayList<Object> destroyedSeqNum = new ArrayList<Object>();
-
- // create a batch of all the entries of a bucket
- while (bucketId == currentBucketId) {
- listToDestroy.add(event);
- destroyedSeqNum.add(event.getShadowKey());
- destroyed++;
-
- if (this.peekedEvents.size() == 0 || (destroyed) >= batchSize) {
- event = null;
- break;
- }
-
- event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
-
- bucketId = event.getBucketId();
-
- if (!this.sender.isRunning()){
- if (logger.isDebugEnabled()) {
- logger.debug("ParallelGatewaySenderQueue#remove: Cache is closing down. Ignoring remove request.");
- }
- return;
- }
- }
- try {
- HDFSBucketRegionQueue brq = getBucketRegionQueue((PartitionedRegion) currentRegion, currentBucketId);
-
- if (brq != null) {
- // destroy the entries from the bucket
- brq.destroyKeys(listToDestroy);
- // Adding the removed event to the map for BatchRemovalMessage
- // We need to provide the prQ as there could be multiple
- // queue in a PGS now.
- PartitionedRegion prQ = brq.getPartitionedRegion();
- addRemovedEvents(prQ, currentBucketId, destroyedSeqNum);
- }
-
- } catch (ForceReattemptException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("ParallelGatewaySenderQueue#remove: " + "Got ForceReattemptException for " + this
- + " for bucket = " + bucketId);
- }
- }
- catch(EntryNotFoundException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("ParallelGatewaySenderQueue#remove: " + "Got EntryNotFoundException for " + this
- + " for bucket = " + bucketId );
- }
- }
- }
- }
-
- /**
- * Keeps a track of number of elements peeked across all buckets.
- */
- private void setElementsPeekedAcrossBuckets(int peekedElements) {
- this.elementsPeekedAcrossBuckets +=peekedElements;
- }
-
- /**
- * Returns the number of elements peeked across all buckets. Also,
- * resets this counter.
- */
- private int getAndresetElementsPeekedAcrossBuckets() {
- int peekedElements = this.elementsPeekedAcrossBuckets;
- this.elementsPeekedAcrossBuckets = 0;
- return peekedElements;
- }
-
- @Override
- public void remove() throws CacheException {
- throw new UnsupportedOperationException("Method HDFSParallelGatewaySenderQueue#remove is not supported");
- }
-
- @Override
- public void put(Object object) throws InterruptedException, CacheException {
- super.put(object);
- }
-
- protected ArrayList peekAhead(int bucketId, HDFSBucketRegionQueue hrq) throws CacheException {
-
- if (logger.isDebugEnabled()) {
- logger.debug(this + ": Peekahead for the bucket " + bucketId);
- }
- ArrayList list = hrq.peekABatch();
- if (logger.isDebugEnabled() && list != null ) {
- logger.debug(this + ": Peeked" + list.size() + "objects from bucket " + bucketId);
- }
-
- return list;
- }
-
- @Override
- public Object take() {
- throw new UnsupportedOperationException("take() is not supported for " + HDFSParallelGatewaySenderQueue.class.toString());
- }
-
- protected boolean isUsedForHDFS()
- {
- return true;
- }
-
- @Override
- protected void afterRegionAdd (PartitionedRegion userPR) {
- }
-
- /**
- * gets the value for region key from the HDFSBucketRegionQueue
- * @param region
- * @throws ForceReattemptException
- */
- public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey, int bucketId) throws ForceReattemptException {
- try {
- HDFSBucketRegionQueue brq = getBucketRegionQueue(region, bucketId);
-
- if (brq ==null)
- return null;
-
- return brq.getObjectForRegionKey(region, regionKey);
- } catch(EntryNotFoundException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("HDFSParallelGatewaySenderQueue#get: " + "Got EntryNotFoundException for " + this
- + " for bucket = " + bucketId);
- }
- }
- return null;
- }
-
- @Override
- public void clear(PartitionedRegion pr, int bucketId) {
- HDFSBucketRegionQueue brq;
- try {
- brq = getBucketRegionQueue(pr, bucketId);
- if (brq == null)
- return;
- brq.clear();
- } catch (ForceReattemptException e) {
- //do nothing, bucket was destroyed.
- }
- }
-
- @Override
- public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException {
- HDFSBucketRegionQueue hq = getBucketRegionQueue(pr, bucketId);
- return hq.size();
- }
-
- public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
- int bucketId) throws ForceReattemptException {
- PartitionedRegion leader = ColocationHelper.getLeaderRegion(region);
- if (leader == null)
- return null;
- String leaderregionPath = leader.getFullPath();
- PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(leaderregionPath);
- if (prQ == null)
- return null;
- HDFSBucketRegionQueue brq;
-
- brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
- .getLocalBucketById(bucketId));
- if(brq == null) {
- prQ.getRegionAdvisor().waitForLocalBucketStorage(bucketId);
- }
- brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
- .getInitializedBucketForId(null, bucketId));
- return brq;
- }
-
- /**
- * This class has the responsibility of rolling the lists of Sorted event
- * Queue. The rolling of lists by a separate thread is required because
- * neither put thread nor the peek/remove thread can do that. Put thread
- * cannot do it because that would mean doing some synchronization with
- * other put threads and peek thread that would hamper the put latency.
- * Peek thread cannot do it because if the event insert rate is too high
- * the list size can go way beyond what its size.
- *
- */
- class RollSortedListsTimerTask extends SystemTimerTask {
-
-
- /**
- * This function ensures that if any of the buckets has lists that are beyond
- * its size, they gets rolled over into new skip lists.
- */
- @Override
- public void run2() {
- Set<PartitionedRegion> prQs = getRegions();
- for (PartitionedRegion prQ : prQs) {
- ArrayList<Integer> buckets = new ArrayList<Integer>(prQ
- .getDataStore().getAllLocalPrimaryBucketIds());
- for (Integer bId : buckets) {
- HDFSBucketRegionQueue hrq = ((HDFSBucketRegionQueue)prQ
- .getDataStore().getLocalBucketById(bId));
- if (hrq == null) {
- // bucket moved to another node after getAllLocalPrimaryBucketIds
- // was called. continue fixing the next bucket.
- continue;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Rolling over the list for bucket id: " + bId);
- }
- hrq.rolloverSkipList();
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
deleted file mode 100644
index 16d3d87..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.Serializable;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.GemFireConfigException;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-
-/**
- * Class to hold all hdfs store related configuration. Instead of copying the
- * same members in two different classes, factory and store, this class will be
- * used. The idea is let HdfsStoreImpl and HdfsStoreCreation delegate get calls,
- * set calls and copy constructor calls this class. Moreover this config holder
- * can be entirely replaced to support alter config
- *
- */
-public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Serializable {
- private String name = null;
- private String namenodeURL = null;
- private String homeDir = DEFAULT_HOME_DIR;
- private String clientConfigFile = null;
- private float blockCacheSize = DEFAULT_BLOCK_CACHE_SIZE;
- private int maxFileSize = DEFAULT_WRITE_ONLY_FILE_SIZE_LIMIT;
- private int fileRolloverInterval = DEFAULT_WRITE_ONLY_FILE_ROLLOVER_INTERVAL;
- protected boolean isAutoCompact = DEFAULT_MINOR_COMPACTION;
- protected boolean autoMajorCompact = DEFAULT_MAJOR_COMPACTION;
- protected int maxConcurrency = DEFAULT_MINOR_COMPACTION_THREADS;
- protected int majorCompactionConcurrency = DEFAULT_MAJOR_COMPACTION_THREADS;
- protected int majorCompactionIntervalMins = DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS;
- protected int maxInputFileSizeMB = DEFAULT_INPUT_FILE_SIZE_MAX_MB;
- protected int maxInputFileCount = DEFAULT_INPUT_FILE_COUNT_MAX;
- protected int minInputFileCount = DEFAULT_INPUT_FILE_COUNT_MIN;
- protected int oldFileCleanupIntervalMins = DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS;
-
- protected int batchSize = DEFAULT_BATCH_SIZE_MB;
- protected int batchIntervalMillis = DEFAULT_BATCH_INTERVAL_MILLIS;
- protected int maximumQueueMemory = DEFAULT_MAX_BUFFER_MEMORY;
- protected boolean isPersistenceEnabled = DEFAULT_BUFFER_PERSISTANCE;
- protected String diskStoreName = null;
- protected boolean diskSynchronous = DEFAULT_DISK_SYNCHRONOUS;
- protected int dispatcherThreads = DEFAULT_DISPATCHER_THREADS;
-
- private static final Logger logger = LogService.getLogger();
- protected final String logPrefix;
-
- public HDFSStoreConfigHolder() {
- this(null);
- }
-
- /**
- * @param config configuration source for creating this instance
- */
- public HDFSStoreConfigHolder(HDFSStore config) {
- this.logPrefix = "<" + getName() + "> ";
- if (config == null) {
- return;
- }
-
- this.name = config.getName();
- this.namenodeURL = config.getNameNodeURL();
- this.homeDir = config.getHomeDir();
- this.clientConfigFile = config.getHDFSClientConfigFile();
- this.blockCacheSize = config.getBlockCacheSize();
- this.maxFileSize = config.getWriteOnlyFileRolloverSize();
- this.fileRolloverInterval = config.getWriteOnlyFileRolloverInterval();
- isAutoCompact = config.getMinorCompaction();
- maxConcurrency = config.getMinorCompactionThreads();
- autoMajorCompact = config.getMajorCompaction();
- majorCompactionConcurrency = config.getMajorCompactionThreads();
- majorCompactionIntervalMins = config.getMajorCompactionInterval();
- maxInputFileSizeMB = config.getInputFileSizeMax();
- maxInputFileCount = config.getInputFileCountMax();
- minInputFileCount = config.getInputFileCountMin();
- oldFileCleanupIntervalMins = config.getPurgeInterval();
-
- batchSize = config.getBatchSize();
- batchIntervalMillis = config.getBatchInterval();
- maximumQueueMemory = config.getMaxMemory();
- isPersistenceEnabled = config.getBufferPersistent();
- diskStoreName = config.getDiskStoreName();
- diskSynchronous = config.getSynchronousDiskWrite();
- dispatcherThreads = config.getDispatcherThreads();
- }
-
- public void resetDefaultValues() {
- name = null;
- namenodeURL = null;
- homeDir = null;
- clientConfigFile = null;
- blockCacheSize = -1f;
- maxFileSize = -1;
- fileRolloverInterval = -1;
-
- isAutoCompact = false;
- maxConcurrency = -1;
- maxInputFileSizeMB = -1;
- maxInputFileCount = -1;
- minInputFileCount = -1;
- oldFileCleanupIntervalMins = -1;
-
- autoMajorCompact = false;
- majorCompactionConcurrency = -1;
- majorCompactionIntervalMins = -1;
-
- batchSize = -1;
- batchIntervalMillis = -1;
- maximumQueueMemory = -1;
- isPersistenceEnabled = false;
- diskStoreName = null;
- diskSynchronous = false;
- dispatcherThreads = -1;
- }
-
- public void copyFrom(HDFSStoreMutator mutator) {
- if (mutator.getWriteOnlyFileRolloverInterval() >= 0) {
- logAttrMutation("fileRolloverInterval", mutator.getWriteOnlyFileRolloverInterval());
- setWriteOnlyFileRolloverInterval(mutator.getWriteOnlyFileRolloverInterval());
- }
- if (mutator.getWriteOnlyFileRolloverSize() >= 0) {
- logAttrMutation("MaxFileSize", mutator.getWriteOnlyFileRolloverInterval());
- setWriteOnlyFileRolloverSize(mutator.getWriteOnlyFileRolloverSize());
- }
-
- if (mutator.getMinorCompaction() != null) {
- logAttrMutation("MinorCompaction", mutator.getMinorCompaction());
- setMinorCompaction(mutator.getMinorCompaction());
- }
-
- if (mutator.getMinorCompactionThreads() >= 0) {
- logAttrMutation("MaxThreads", mutator.getMinorCompactionThreads());
- setMinorCompactionThreads(mutator.getMinorCompactionThreads());
- }
-
- if (mutator.getMajorCompactionInterval() > -1) {
- logAttrMutation("MajorCompactionIntervalMins", mutator.getMajorCompactionInterval());
- setMajorCompactionInterval(mutator.getMajorCompactionInterval());
- }
- if (mutator.getMajorCompactionThreads() >= 0) {
- logAttrMutation("MajorCompactionMaxThreads", mutator.getMajorCompactionThreads());
- setMajorCompactionThreads(mutator.getMajorCompactionThreads());
- }
- if (mutator.getMajorCompaction() != null) {
- logAttrMutation("AutoMajorCompaction", mutator.getMajorCompaction());
- setMajorCompaction(mutator.getMajorCompaction());
- }
- if (mutator.getInputFileCountMax() >= 0) {
- logAttrMutation("maxInputFileCount", mutator.getInputFileCountMax());
- setInputFileCountMax(mutator.getInputFileCountMax());
- }
- if (mutator.getInputFileSizeMax() >= 0) {
- logAttrMutation("MaxInputFileSizeMB", mutator.getInputFileSizeMax());
- setInputFileSizeMax(mutator.getInputFileSizeMax());
- }
- if (mutator.getInputFileCountMin() >= 0) {
- logAttrMutation("MinInputFileCount", mutator.getInputFileCountMin());
- setInputFileCountMin(mutator.getInputFileCountMin());
- }
- if (mutator.getPurgeInterval() >= 0) {
- logAttrMutation("OldFilesCleanupIntervalMins", mutator.getPurgeInterval());
- setPurgeInterval(mutator.getPurgeInterval());
- }
-
- if (mutator.getBatchSize() >= 0) {
- logAttrMutation("batchSizeMB", mutator.getWriteOnlyFileRolloverInterval());
- setBatchSize(mutator.getBatchSize());
- }
- if (mutator.getBatchInterval() >= 0) {
- logAttrMutation("batchTimeInterval", mutator.getWriteOnlyFileRolloverInterval());
- setBatchInterval(mutator.getBatchInterval());
- }
- }
-
- void logAttrMutation(String name, Object value) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Alter " + name + ":" + value, logPrefix);
- }
- }
-
- @Override
- public String getName() {
- return name;
- }
- @Override
- public HDFSStoreFactory setName(String name) {
- this.name = name;
- return this;
- }
-
- @Override
- public String getNameNodeURL() {
- return namenodeURL;
- }
- @Override
- public HDFSStoreFactory setNameNodeURL(String namenodeURL) {
- this.namenodeURL = namenodeURL;
- return this;
- }
-
- @Override
- public String getHomeDir() {
- return homeDir;
- }
- @Override
- public HDFSStoreFactory setHomeDir(String homeDir) {
- this.homeDir = homeDir;
- return this;
- }
-
- @Override
- public String getHDFSClientConfigFile() {
- return clientConfigFile;
- }
- @Override
- public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) {
- this.clientConfigFile = clientConfigFile;
- return this;
- }
-
- @Override
- public HDFSStoreFactory setBlockCacheSize(float percentage) {
- if(percentage < 0 || percentage > 100) {
- throw new IllegalArgumentException("Block cache size must be between 0 and 100, inclusive");
- }
- this.blockCacheSize = percentage;
- return this;
- }
-
- @Override
- public float getBlockCacheSize() {
- return blockCacheSize;
- }
-
- @Override
- public HDFSStoreFactory setWriteOnlyFileRolloverSize(int maxFileSize) {
- assertIsPositive(CacheXml.HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL, maxFileSize);
- this.maxFileSize = maxFileSize;
- return this;
- }
- @Override
- public int getWriteOnlyFileRolloverSize() {
- return maxFileSize;
- }
-
- @Override
- public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) {
- assertIsPositive(CacheXml.HDFS_TIME_FOR_FILE_ROLLOVER, count);
- this.fileRolloverInterval = count;
- return this;
- }
- @Override
- public int getWriteOnlyFileRolloverInterval() {
- return fileRolloverInterval;
- }
-
- @Override
- public boolean getMinorCompaction() {
- return isAutoCompact;
- }
- @Override
- public HDFSStoreFactory setMinorCompaction(boolean auto) {
- this.isAutoCompact = auto;
- return this;
- }
-
- @Override
- public HDFSStoreFactory setMinorCompactionThreads(int count) {
- assertIsPositive(CacheXml.HDFS_MINOR_COMPACTION_THREADS, count);
- this.maxConcurrency = count;
- return this;
- }
- @Override
- public int getMinorCompactionThreads() {
- return maxConcurrency;
- }
-
- @Override
- public HDFSStoreFactory setMajorCompaction(boolean auto) {
- this.autoMajorCompact = auto;
- return this;
- }
- @Override
- public boolean getMajorCompaction() {
- return autoMajorCompact;
- }
-
- @Override
- public HDFSStoreFactory setMajorCompactionInterval(int count) {
- HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_INTERVAL, count);
- this.majorCompactionIntervalMins = count;
- return this;
- }
- @Override
- public int getMajorCompactionInterval() {
- return majorCompactionIntervalMins;
- }
-
- @Override
- public HDFSStoreFactory setMajorCompactionThreads(int count) {
- HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_THREADS, count);
- this.majorCompactionConcurrency = count;
- return this;
- }
- @Override
- public int getMajorCompactionThreads() {
- return majorCompactionConcurrency;
- }
-
- @Override
- public HDFSStoreFactory setInputFileSizeMax(int size) {
- HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_SIZE_MB", size);
- this.maxInputFileSizeMB = size;
- return this;
- }
- @Override
- public int getInputFileSizeMax() {
- return maxInputFileSizeMB;
- }
-
- @Override
- public HDFSStoreFactory setInputFileCountMin(int count) {
- HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MIN_INPUT_FILE_COUNT", count);
- this.minInputFileCount = count;
- return this;
- }
- @Override
- public int getInputFileCountMin() {
- return minInputFileCount;
- }
-
- @Override
- public HDFSStoreFactory setInputFileCountMax(int count) {
- HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_COUNT", count);
- this.maxInputFileCount = count;
- return this;
- }
- @Override
- public int getInputFileCountMax() {
- return maxInputFileCount;
- }
-
- @Override
- public int getPurgeInterval() {
- return oldFileCleanupIntervalMins ;
- }
- @Override
- public HDFSStoreFactory setPurgeInterval(int interval) {
- assertIsPositive(CacheXml.HDFS_PURGE_INTERVAL, interval);
- this.oldFileCleanupIntervalMins = interval;
- return this;
- }
-
- protected void validate() {
- if (minInputFileCount > maxInputFileCount) {
- throw new IllegalArgumentException(
- LocalizedStrings.HOPLOG_MIN_IS_MORE_THAN_MAX
- .toLocalizedString(new Object[] {
- "HDFS_COMPACTION_MIN_INPUT_FILE_COUNT",
- minInputFileCount,
- "HDFS_COMPACTION_MAX_INPUT_FILE_COUNT",
- maxInputFileCount }));
- }
- }
-
- /**
- * This method should not be called on this class.
- * @see HDFSStoreFactory#create(String)
- */
- @Override
- public HDFSStore create(String name) throws GemFireConfigException,
- StoreExistsException {
- throw new UnsupportedOperationException();
- }
-
- /**
- * This method should not be called on this class.
- * @see HDFSStoreImpl#destroy()
- */
- @Override
- public void destroy() {
- throw new UnsupportedOperationException();
- }
-
- public static void assertIsPositive(String name, int count) {
- if (count < 1) {
- throw new IllegalArgumentException(
- LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
- .toLocalizedString(new Object[] { name, count }));
- }
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("HDFSStoreConfigHolder@");
- builder.append(System.identityHashCode(this));
- builder.append(" [");
- appendStrProp(builder, name, "name");
- appendStrProp(builder, namenodeURL, "namenodeURL");
- appendStrProp(builder, homeDir, "homeDir");
- appendStrProp(builder, clientConfigFile, "clientConfigFile");
- if (blockCacheSize > -1) {
- builder.append("blockCacheSize=");
- builder.append(blockCacheSize);
- builder.append(", ");
- }
- appendIntProp(builder, maxFileSize, "maxFileSize");
- appendIntProp(builder, fileRolloverInterval, "fileRolloverInterval");
- appendBoolProp(builder, isAutoCompact, "isAutoCompact");
- appendBoolProp(builder, autoMajorCompact, "autoMajorCompact");
- appendIntProp(builder, maxConcurrency, "maxConcurrency");
- appendIntProp(builder, majorCompactionConcurrency, "majorCompactionConcurrency");
- appendIntProp(builder, majorCompactionIntervalMins, "majorCompactionIntervalMins");
- appendIntProp(builder, maxInputFileSizeMB, "maxInputFileSizeMB");
- appendIntProp(builder, maxInputFileCount, "maxInputFileCount");
- appendIntProp(builder, minInputFileCount, "minInputFileCount");
- appendIntProp(builder, oldFileCleanupIntervalMins, "oldFileCleanupIntervalMins");
- appendIntProp(builder, batchSize, "batchSize");
- appendIntProp(builder, batchIntervalMillis, "batchInterval");
- appendIntProp(builder, maximumQueueMemory, "maximumQueueMemory");
- appendIntProp(builder, dispatcherThreads, "dispatcherThreads");
- appendBoolProp(builder, isPersistenceEnabled, "isPersistenceEnabled");
- appendStrProp(builder, diskStoreName, "diskStoreName");
- appendBoolProp(builder, diskSynchronous, "diskSynchronous");
-
- builder.append("]");
- return builder.toString();
- }
-
- private void appendStrProp(StringBuilder builder, String value, String name) {
- if (value != null) {
- builder.append(name + "=");
- builder.append(value);
- builder.append(", ");
- }
- }
-
- private void appendIntProp(StringBuilder builder, int value, String name) {
- if (value > -1) {
- builder.append(name + "=");
- builder.append(value);
- builder.append(", ");
- }
- }
-
- private void appendBoolProp(StringBuilder builder, boolean value, String name) {
- builder.append(name + "=");
- builder.append(value);
- builder.append(", ");
- }
-
- @Override
- public HDFSStoreMutator createHdfsStoreMutator() {
- // as part of alter execution, hdfs store will replace the config holder
- // completely. Hence mutator at the config holder is not needed
- throw new UnsupportedOperationException();
- }
-
- @Override
- public HDFSStore alter(HDFSStoreMutator mutator) {
- // as part of alter execution, hdfs store will replace the config holder
- // completely. Hence mutator at the config holder is not needed
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getDiskStoreName() {
- return this.diskStoreName;
- }
- @Override
- public HDFSStoreFactory setDiskStoreName(String name) {
- this.diskStoreName = name;
- return this;
- }
-
- @Override
- public int getBatchInterval() {
- return this.batchIntervalMillis;
- }
- @Override
- public HDFSStoreFactory setBatchInterval(int intervalMillis){
- this.batchIntervalMillis = intervalMillis;
- return this;
- }
-
- @Override
- public boolean getBufferPersistent() {
- return isPersistenceEnabled;
- }
- @Override
- public HDFSStoreFactory setBufferPersistent(boolean isPersistent) {
- this.isPersistenceEnabled = isPersistent;
- return this;
- }
-
- @Override
- public int getDispatcherThreads() {
- return dispatcherThreads;
- }
- @Override
- public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) {
- this.dispatcherThreads = dispatcherThreads;
- return this;
- }
-
- @Override
- public int getMaxMemory() {
- return this.maximumQueueMemory;
- }
- @Override
- public HDFSStoreFactory setMaxMemory(int memory) {
- this.maximumQueueMemory = memory;
- return this;
- }
-
- @Override
- public int getBatchSize() {
- return this.batchSize;
- }
- @Override
- public HDFSStoreFactory setBatchSize(int size){
- this.batchSize = size;
- return this;
- }
-
- @Override
- public boolean getSynchronousDiskWrite() {
- return this.diskSynchronous;
- }
- @Override
- public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) {
- this.diskSynchronous = isSynchronous;
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
deleted file mode 100644
index 9ecc5e3..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import com.gemstone.gemfire.GemFireConfigException;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-/**
- */
-public class HDFSStoreCreation implements HDFSStoreFactory {
- protected HDFSStoreConfigHolder configHolder;
-
- public HDFSStoreCreation() {
- this(null);
- }
-
- /**
- * Copy constructor for HDFSStoreCreation
- * @param config configuration source for creating this instance
- */
- public HDFSStoreCreation(HDFSStoreCreation config) {
- this.configHolder = new HDFSStoreConfigHolder(config == null ? null : config.configHolder);
- }
-
- @Override
- public HDFSStoreFactory setName(String name) {
- configHolder.setName(name);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setNameNodeURL(String namenodeURL) {
- configHolder.setNameNodeURL(namenodeURL);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setHomeDir(String homeDir) {
- configHolder.setHomeDir(homeDir);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) {
- configHolder.setHDFSClientConfigFile(clientConfigFile);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setBlockCacheSize(float percentage) {
- configHolder.setBlockCacheSize(percentage);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setWriteOnlyFileRolloverSize(int maxFileSize) {
- configHolder.setWriteOnlyFileRolloverSize(maxFileSize);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) {
- configHolder.setWriteOnlyFileRolloverInterval(count);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setMinorCompaction(boolean auto) {
- configHolder.setMinorCompaction(auto);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setMinorCompactionThreads(int count) {
- configHolder.setMinorCompactionThreads(count);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setMajorCompaction(boolean auto) {
- configHolder.setMajorCompaction(auto);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setMajorCompactionInterval(int count) {
- configHolder.setMajorCompactionInterval(count);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setMajorCompactionThreads(int count) {
- configHolder.setMajorCompactionThreads(count);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setInputFileSizeMax(int size) {
- configHolder.setInputFileSizeMax(size);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setInputFileCountMin(int count) {
- configHolder.setInputFileCountMin(count);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setInputFileCountMax(int count) {
- configHolder.setInputFileCountMax(count);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setPurgeInterval(int interval) {
- configHolder.setPurgeInterval(interval);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setDiskStoreName(String name) {
- configHolder.setDiskStoreName(name);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setMaxMemory(int memory) {
- configHolder.setMaxMemory(memory);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setBatchInterval(int intervalMillis) {
- configHolder.setBatchInterval(intervalMillis);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setBatchSize(int size) {
- configHolder.setBatchSize(size);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setBufferPersistent(boolean isPersistent) {
- configHolder.setBufferPersistent(isPersistent);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) {
- configHolder.setSynchronousDiskWrite(isSynchronous);
- return this;
- }
-
- @Override
- public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) {
- configHolder.setDispatcherThreads(dispatcherThreads);
- return this;
- }
-
- /**
- * This method should not be called on this class.
- * @see HDFSStoreFactory#create(String)
- */
- @Override
- public HDFSStore create(String name) throws GemFireConfigException,
- StoreExistsException {
- throw new UnsupportedOperationException();
- }
-
- public static void assertIsPositive(String name, int count) {
- if (count < 1) {
- throw new IllegalArgumentException(
- LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
- .toLocalizedString(new Object[] { name, count }));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
deleted file mode 100644
index 749f01c..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import com.gemstone.gemfire.GemFireConfigException;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-
-
-/**
- * Implementation of HDFSStoreFactory
- *
- */
-public class HDFSStoreFactoryImpl extends HDFSStoreCreation {
- public static final String DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS= "HDFS_QUEUE";
-
- private Cache cache;
-
- public HDFSStoreFactoryImpl(Cache cache) {
- this(cache, null);
- }
-
- public HDFSStoreFactoryImpl(Cache cache, HDFSStoreCreation config) {
- super(config);
- this.cache = cache;
- }
-
- @Override
- public HDFSStore create(String name) {
- if (name == null) {
- throw new GemFireConfigException("HDFS store name not provided");
- }
-
- this.configHolder.validate();
-
- HDFSStore result = null;
- synchronized (this) {
- if (this.cache instanceof GemFireCacheImpl) {
- GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache;
- if (gfc.findHDFSStore(name) != null) {
- throw new StoreExistsException(name);
- }
-
- HDFSStoreImpl hsi = new HDFSStoreImpl(name, this.configHolder);
- gfc.addHDFSStore(hsi);
- result = hsi;
- }
- }
- return result;
- }
-
- public static final String getEventQueueName(String regionPath) {
- return HDFSStoreFactoryImpl.DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS + "_"
- + regionPath.replace('/', '_');
- }
-
- public HDFSStore getConfigView() {
- return (HDFSStore) configHolder;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
deleted file mode 100644
index b5d56b6..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
+++ /dev/null
@@ -1,638 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.concurrent.Callable;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.ConnectTimeoutException;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import com.gemstone.gemfire.internal.util.SingletonCallable;
-import com.gemstone.gemfire.internal.util.SingletonValue;
-import com.gemstone.gemfire.internal.util.SingletonValue.SingletonBuilder;
-
-/**
- * Represents a HDFS based persistent store for region data.
- *
- */
-public class HDFSStoreImpl implements HDFSStore {
-
- private volatile HDFSStoreConfigHolder configHolder;
-
- private final SingletonValue<FileSystem> fs;
-
- /**
- * Used to make sure that only one thread creates the writer at a time. This prevents the dispatcher
- * threads from cascading the Connection lock in DFS client see bug 51195
- */
- private final SingletonCallable<HoplogWriter> singletonWriter = new SingletonCallable<HoplogWriter>();
-
- private final HFileStoreStatistics stats;
- private final BlockCache blockCache;
-
- private static HashSet<String> secureNameNodes = new HashSet<String>();
-
- private final boolean PERFORM_SECURE_HDFS_CHECK = Boolean.getBoolean(HoplogConfig.PERFORM_SECURE_HDFS_CHECK_PROP);
- private static final Logger logger = LogService.getLogger();
- protected final String logPrefix;
-
- static {
- HdfsConfiguration.init();
- }
-
- public HDFSStoreImpl(String name, final HDFSStore config) {
- this.configHolder = new HDFSStoreConfigHolder(config);
- configHolder.setName(name);
-
- this.logPrefix = "<" + "HdfsStore:" + name + "> ";
-
- stats = new HFileStoreStatistics(InternalDistributedSystem.getAnyInstance(), "HDFSStoreStatistics", name);
-
- final Configuration hconf = new Configuration();
-
- // Set the block cache size.
- // Disable the static block cache. We keep our own cache on the HDFS Store
- // hconf.setFloat("hfile.block.cache.size", 0f);
- if (this.getBlockCacheSize() != 0) {
- long cacheSize = (long) (HeapMemoryMonitor.getTenuredPoolMaxMemory() * this.getBlockCacheSize() / 100);
-
- // TODO use an off heap block cache if we're using off heap memory?
- // See CacheConfig.instantiateBlockCache.
- // According to Anthony, the off heap block cache is still
- // experimental. Our own off heap cache might be a better bet.
-// this.blockCache = new LruBlockCache(cacheSize,
-// StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf, HFileSortedOplogFactory.convertStatistics(stats));
- this.blockCache = new LruBlockCache(cacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf);
- } else {
- this.blockCache = null;
- }
-
- final String clientFile = config.getHDFSClientConfigFile();
- fs = new SingletonValue<FileSystem>(new SingletonBuilder<FileSystem>() {
- @Override
- public FileSystem create() throws IOException {
- return createFileSystem(hconf, clientFile, false);
- }
-
- @Override
- public void postCreate() {
- }
-
- @Override
- public void createInProgress() {
- }
- });
-
- FileSystem fileSystem = null;
- try {
- fileSystem = fs.get();
- } catch (Throwable ex) {
- throw new HDFSIOException(ex.getMessage(),ex);
- }
- //HDFSCompactionConfig has already been initialized
- long cleanUpIntervalMillis = getPurgeInterval() * 60 * 1000;
- Path cleanUpIntervalPath = new Path(getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
- HoplogUtil.exposeCleanupIntervalMillis(fileSystem, cleanUpIntervalPath, cleanUpIntervalMillis);
- }
-
- /**
- * Creates a new file system every time.
- */
- public FileSystem createFileSystem() {
- Configuration hconf = new Configuration();
- try {
- return createFileSystem(hconf, this.getHDFSClientConfigFile(), true);
- } catch (Throwable ex) {
- throw new HDFSIOException(ex.getMessage(),ex);
- }
- }
-
- private FileSystem createFileSystem(Configuration hconf, String configFile, boolean forceNew) throws IOException {
- FileSystem filesystem = null;
-
- // load hdfs client config file if specified. The path is on local file
- // system
- if (configFile != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Adding resource config file to hdfs configuration:" + configFile, logPrefix);
- }
- hconf.addResource(new Path(configFile));
-
- if (! new File(configFile).exists()) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_CLIENT_CONFIG_FILE_ABSENT, configFile));
- }
- }
-
- // This setting disables shutdown hook for file system object. Shutdown
- // hook may cause FS object to close before the cache or store and
- // unpredictable behavior. This setting is provided for GFXD like server
- // use cases where FS close is managed by a server. This setting is not
- // supported by old versions of hadoop, HADOOP-4829
- hconf.setBoolean("fs.automatic.close", false);
-
- // Hadoop has a configuration parameter io.serializations that is a list of serialization
- // classes which can be used for obtaining serializers and deserializers. This parameter
- // by default contains avro classes. When a sequence file is created, it calls
- // SerializationFactory.getSerializer(keyclass). This internally creates objects using
- // reflection of all the classes that were part of io.serializations. But since, there is
- // no avro class available it throws an exception.
- // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes
- // that are important to us.
- hconf.setStrings("io.serializations",
- new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"});
- // create writer
-
- SchemaMetrics.configureGlobally(hconf);
-
- String nameNodeURL = null;
- if ((nameNodeURL = getNameNodeURL()) == null) {
- nameNodeURL = hconf.get("fs.default.name");
- }
-
- URI namenodeURI = URI.create(nameNodeURL);
-
- //if (! GemFireCacheImpl.getExisting().isHadoopGfxdLonerMode()) {
- String authType = hconf.get("hadoop.security.authentication");
-
- //The following code handles Gemfire XD with secure HDFS
- //A static set is used to cache all known secure HDFS NameNode urls.
- UserGroupInformation.setConfiguration(hconf);
-
- //Compare authentication method ignoring case to make GFXD future version complaint
- //At least version 2.0.2 starts complaining if the string "kerberos" is not in all small case.
- //However it seems current version of hadoop accept the authType in any case
- if (authType.equalsIgnoreCase("kerberos")) {
-
- String principal = hconf.get(HoplogConfig.KERBEROS_PRINCIPAL);
- String keyTab = hconf.get(HoplogConfig.KERBEROS_KEYTAB_FILE);
-
- if (!PERFORM_SECURE_HDFS_CHECK) {
- if (logger.isDebugEnabled())
- logger.debug("{}Ignore secure hdfs check", logPrefix);
- } else {
- if (!secureNameNodes.contains(nameNodeURL)) {
- if (logger.isDebugEnabled())
- logger.debug("{}Executing secure hdfs check", logPrefix);
- try{
- filesystem = FileSystem.newInstance(namenodeURI, hconf);
- //Make sure no IOExceptions are generated when accessing insecure HDFS.
- filesystem.listFiles(new Path("/"),false);
- throw new HDFSIOException("Gemfire XD HDFS client and HDFS cluster security levels do not match. The configured HDFS Namenode is not secured.");
- } catch (IOException ex) {
- secureNameNodes.add(nameNodeURL);
- } finally {
- //Close filesystem to avoid resource leak
- if(filesystem != null) {
- closeFileSystemIgnoreError(filesystem);
- }
- }
- }
- }
-
- // check to ensure the namenode principal is defined
- String nameNodePrincipal = hconf.get("dfs.namenode.kerberos.principal");
- if (nameNodePrincipal == null) {
- throw new IOException(LocalizedStrings.GF_KERBEROS_NAMENODE_PRINCIPAL_UNDEF.toLocalizedString());
- }
-
- // ok, the user specified a gfxd principal so we will try to login
- if (principal != null) {
- //If NameNode principal is the same as Gemfire XD principal, there is a
- //potential security hole
- String regex = "[/@]";
- if (nameNodePrincipal != null) {
- String HDFSUser = nameNodePrincipal.split(regex)[0];
- String GFXDUser = principal.split(regex)[0];
- if (HDFSUser.equals(GFXDUser)) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.HDFS_USER_IS_SAME_AS_GF_USER, GFXDUser));
- }
- }
-
- // a keytab must exist if the user specifies a principal
- if (keyTab == null) {
- throw new IOException(LocalizedStrings.GF_KERBEROS_KEYTAB_UNDEF.toLocalizedString());
- }
-
- // the keytab must exist as well
- File f = new File(keyTab);
- if (!f.exists()) {
- throw new FileNotFoundException(LocalizedStrings.GF_KERBEROS_KEYTAB_FILE_ABSENT.toLocalizedString(f.getAbsolutePath()));
- }
-
- //Authenticate Gemfire XD principal to Kerberos KDC using Gemfire XD keytab file
- String principalWithValidHost = SecurityUtil.getServerPrincipal(principal, "");
- UserGroupInformation.loginUserFromKeytab(principalWithValidHost, keyTab);
- } else {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GF_KERBEROS_PRINCIPAL_UNDEF));
- }
- }
- //}
-
- filesystem = getFileSystemFactory().create(namenodeURI, hconf, forceNew);
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}Initialized FileSystem linked to " + filesystem.getUri()
- + " " + filesystem.hashCode(), logPrefix);
- }
- return filesystem;
- }
-
- public FileSystem getFileSystem() throws IOException {
- return fs.get();
- }
-
- public FileSystem getCachedFileSystem() {
- return fs.getCachedValue();
- }
-
- public SingletonCallable<HoplogWriter> getSingletonWriter() {
- return this.singletonWriter;
- }
-
- private final SingletonCallable<Boolean> fsExists = new SingletonCallable<Boolean>();
-
- public boolean checkFileSystemExists() throws IOException {
- try {
- return fsExists.runSerially(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- FileSystem fileSystem = getCachedFileSystem();
- if (fileSystem == null) {
- return false;
- }
- return fileSystem.exists(new Path("/"));
- }
- });
- } catch (Exception e) {
- if (e instanceof IOException) {
- throw (IOException)e;
- }
- throw new IOException(e);
- }
- }
-
- /**
- * This method executes a query on namenode. If the query succeeds, FS
- * instance is healthy. If it fails, the old instance is closed and a new
- * instance is created.
- */
- public void checkAndClearFileSystem() {
- FileSystem fileSystem = getCachedFileSystem();
-
- if (fileSystem != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Checking file system at " + fileSystem.getUri(), logPrefix);
- }
- try {
- checkFileSystemExists();
- if (logger.isDebugEnabled()) {
- logger.debug("{}FS client is ok: " + fileSystem.getUri() + " "
- + fileSystem.hashCode(), logPrefix);
- }
- return;
- } catch (ConnectTimeoutException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Hdfs unreachable, FS client is ok: "
- + fileSystem.getUri() + " " + fileSystem.hashCode(), logPrefix);
- }
- return;
- } catch (IOException e) {
- logger.debug("IOError in filesystem checkAndClear ", e);
-
- // The file system is closed or NN is not reachable. It is safest to
- // create a new FS instance. If the NN continues to remain unavailable,
- // all subsequent read/write request will cause HDFSIOException. This is
- // similar to the way hbase manages failures. This has a drawback
- // though. A network blip will result in all connections to be
- // recreated. However trying to preserve the connections and waiting for
- // FS to auto-recover is not deterministic.
- if (e instanceof RemoteException) {
- e = ((RemoteException) e).unwrapRemoteException();
- }
-
- logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_UNREACHABLE,
- fileSystem.getUri()), e);
- }
-
- // compare and clear FS container. The fs container needs to be reusable
- boolean result = fs.clear(fileSystem, true);
- if (!result) {
- // the FS instance changed after this call was initiated. Check again
- logger.debug("{}Failed to clear FS ! I am inconsistent so retrying ..", logPrefix);
- checkAndClearFileSystem();
- } else {
- closeFileSystemIgnoreError(fileSystem);
- }
- }
- }
-
- private void closeFileSystemIgnoreError(FileSystem fileSystem) {
- if (fileSystem == null) {
- logger.debug("{}Trying to close null file system", logPrefix);
- return;
- }
-
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Closing file system at " + fileSystem.getUri() + " "
- + fileSystem.hashCode(), logPrefix);
- }
- fileSystem.close();
- } catch (Exception e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Failed to close file system at " + fileSystem.getUri()
- + " " + fileSystem.hashCode(), e);
- }
- }
- }
-
- public HFileStoreStatistics getStats() {
- return stats;
- }
-
- public BlockCache getBlockCache() {
- return blockCache;
- }
-
- public void close() {
- logger.debug("{}Closing file system: " + getName(), logPrefix);
- stats.close();
- blockCache.shutdown();
- //Might want to clear the block cache, but it should be dereferenced.
-
- // release DDL hoplog organizer for this store. Also shutdown compaction
- // threads. These two resources hold references to GemfireCacheImpl
- // instance. Any error is releasing this resources is not critical and needs
- // be ignored.
- try {
- HDFSCompactionManager manager = HDFSCompactionManager.getInstance(this);
- if (manager != null) {
- manager.reset();
- }
- } catch (Exception e) {
- logger.info(e);
- }
-
- // once this store is closed, this store should not be used again
- FileSystem fileSystem = fs.clear(false);
- if (fileSystem != null) {
- closeFileSystemIgnoreError(fileSystem);
- }
- }
-
- /**
- * Test hook to remove all of the contents of the the folder
- * for this HDFS store from HDFS.
- * @throws IOException
- */
- public void clearFolder() throws IOException {
- getFileSystem().delete(new Path(getHomeDir()), true);
- }
-
- @Override
- public void destroy() {
- Collection<String> regions = HDFSRegionDirector.getInstance().getRegionsInStore(this);
- if(!regions.isEmpty()) {
- throw new IllegalStateException("Cannot destroy a HDFS store that still contains regions: " + regions);
- }
- close();
- HDFSStoreDirector.getInstance().removeHDFSStore(this.getName());
- }
-
- @Override
- public synchronized HDFSStore alter(HDFSStoreMutator mutator) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Altering hdfsStore " + this, logPrefix);
- logger.debug("{}Mutator " + mutator, logPrefix);
- }
- HDFSStoreConfigHolder newHolder = new HDFSStoreConfigHolder(configHolder);
- newHolder.copyFrom(mutator);
- newHolder.validate();
- HDFSStore oldStore = configHolder;
- configHolder = newHolder;
- if (logger.isDebugEnabled()) {
- logger.debug("{}Resuult of Alter " + this, logPrefix);
- }
- return (HDFSStore) oldStore;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("HDFSStoreImpl [");
- if (configHolder != null) {
- builder.append("configHolder=");
- builder.append(configHolder);
- }
- builder.append("]");
- return builder.toString();
- }
-
- @Override
- public String getName() {
- return configHolder.getName();
- }
-
- @Override
- public String getNameNodeURL() {
- return configHolder.getNameNodeURL();
- }
-
- @Override
- public String getHomeDir() {
- return configHolder.getHomeDir();
- }
-
- @Override
- public String getHDFSClientConfigFile() {
- return configHolder.getHDFSClientConfigFile();
- }
-
- @Override
- public float getBlockCacheSize() {
- return configHolder.getBlockCacheSize();
- }
-
- @Override
- public int getWriteOnlyFileRolloverSize() {
- return configHolder.getWriteOnlyFileRolloverSize();
- }
-
- @Override
- public int getWriteOnlyFileRolloverInterval() {
- return configHolder.getWriteOnlyFileRolloverInterval();
- }
-
- @Override
- public boolean getMinorCompaction() {
- return configHolder.getMinorCompaction();
- }
-
- @Override
- public int getMinorCompactionThreads() {
- return configHolder.getMinorCompactionThreads();
- }
-
- @Override
- public boolean getMajorCompaction() {
- return configHolder.getMajorCompaction();
- }
-
- @Override
- public int getMajorCompactionInterval() {
- return configHolder.getMajorCompactionInterval();
- }
-
- @Override
- public int getMajorCompactionThreads() {
- return configHolder.getMajorCompactionThreads();
- }
-
-
- @Override
- public int getInputFileSizeMax() {
- return configHolder.getInputFileSizeMax();
- }
-
- @Override
- public int getInputFileCountMin() {
- return configHolder.getInputFileCountMin();
- }
-
- @Override
- public int getInputFileCountMax() {
- return configHolder.getInputFileCountMax();
- }
-
- @Override
- public int getPurgeInterval() {
- return configHolder.getPurgeInterval();
- }
-
- @Override
- public String getDiskStoreName() {
- return configHolder.getDiskStoreName();
- }
-
- @Override
- public int getMaxMemory() {
- return configHolder.getMaxMemory();
- }
-
- @Override
- public int getBatchSize() {
- return configHolder.getBatchSize();
- }
-
- @Override
- public int getBatchInterval() {
- return configHolder.getBatchInterval();
- }
-
- @Override
- public boolean getBufferPersistent() {
- return configHolder.getBufferPersistent();
- }
-
- @Override
- public boolean getSynchronousDiskWrite() {
- return configHolder.getSynchronousDiskWrite();
- }
-
- @Override
- public int getDispatcherThreads() {
- return configHolder.getDispatcherThreads();
- }
-
- @Override
- public HDFSStoreMutator createHdfsStoreMutator() {
- return new HDFSStoreMutatorImpl();
- }
-
- public FileSystemFactory getFileSystemFactory() {
- return new DistributedFileSystemFactory();
- }
-
- /*
- * Factory to create HDFS file system instances
- */
- static public interface FileSystemFactory {
- public FileSystem create(URI namenode, Configuration conf, boolean forceNew) throws IOException;
- }
-
- /*
- * File system factory implementations for creating instances of file system
- * connected to distributed HDFS cluster
- */
- public class DistributedFileSystemFactory implements FileSystemFactory {
- private final boolean ALLOW_TEST_FILE_SYSTEM = Boolean.getBoolean(HoplogConfig.ALLOW_LOCAL_HDFS_PROP);
- private final boolean USE_FS_CACHE = Boolean.getBoolean(HoplogConfig.USE_FS_CACHE);
-
- @Override
- public FileSystem create(URI nn, Configuration conf, boolean create) throws IOException {
- FileSystem filesystem;
-
- if (USE_FS_CACHE && !create) {
- filesystem = FileSystem.get(nn, conf);
- } else {
- filesystem = FileSystem.newInstance(nn, conf);
- }
-
- if (filesystem instanceof LocalFileSystem && !ALLOW_TEST_FILE_SYSTEM) {
- closeFileSystemIgnoreError(filesystem);
- throw new IllegalStateException(
- LocalizedStrings.HOPLOG_TRYING_TO_CREATE_STANDALONE_SYSTEM.toLocalizedString(getNameNodeURL()));
- }
-
- return filesystem;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
deleted file mode 100644
index 203e623..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-public class HDFSStoreMutatorImpl implements HDFSStoreMutator {
- private HDFSStoreConfigHolder configHolder;
- private Boolean autoCompact;
- private Boolean autoMajorCompact;
-
- public HDFSStoreMutatorImpl() {
- configHolder = new HDFSStoreConfigHolder();
- configHolder.resetDefaultValues();
- }
-
- public HDFSStoreMutatorImpl(HDFSStore store) {
- configHolder = new HDFSStoreConfigHolder(store);
- }
-
- public HDFSStoreMutator setWriteOnlyFileRolloverSize(int maxFileSize) {
- configHolder.setWriteOnlyFileRolloverSize(maxFileSize);
- return this;
- }
- @Override
- public int getWriteOnlyFileRolloverSize() {
- return configHolder.getWriteOnlyFileRolloverSize();
- }
-
- @Override
- public HDFSStoreMutator setWriteOnlyFileRolloverInterval(int count) {
- configHolder.setWriteOnlyFileRolloverInterval(count);
- return this;
- }
- @Override
- public int getWriteOnlyFileRolloverInterval() {
- return configHolder.getWriteOnlyFileRolloverInterval();
- }
-
- @Override
- public HDFSStoreMutator setMinorCompaction(boolean auto) {
- autoCompact = Boolean.valueOf(auto);
- configHolder.setMinorCompaction(auto);
- return null;
- }
- @Override
- public Boolean getMinorCompaction() {
- return autoCompact;
- }
-
- @Override
- public HDFSStoreMutator setMinorCompactionThreads(int count) {
- configHolder.setMinorCompactionThreads(count);
- return this;
- }
- @Override
- public int getMinorCompactionThreads() {
- return configHolder.getMinorCompactionThreads();
- }
-
- @Override
- public HDFSStoreMutator setMajorCompaction(boolean auto) {
- autoMajorCompact = Boolean.valueOf(auto);
- configHolder.setMajorCompaction(auto);
- return this;
- }
- @Override
- public Boolean getMajorCompaction() {
- return autoMajorCompact;
- }
-
- @Override
- public HDFSStoreMutator setMajorCompactionInterval(int count) {
- configHolder.setMajorCompactionInterval(count);
- return this;
- }
- @Override
- public int getMajorCompactionInterval() {
- return configHolder.getMajorCompactionInterval();
- }
-
- @Override
- public HDFSStoreMutator setMajorCompactionThreads(int count) {
- configHolder.setMajorCompactionThreads(count);
- return this;
- }
- @Override
- public int getMajorCompactionThreads() {
- return configHolder.getMajorCompactionThreads();
- }
-
- @Override
- public HDFSStoreMutator setInputFileSizeMax(int size) {
- configHolder.setInputFileSizeMax(size);
- return this;
- }
- @Override
- public int getInputFileSizeMax() {
- return configHolder.getInputFileSizeMax();
- }
-
- @Override
- public HDFSStoreMutator setInputFileCountMin(int count) {
- configHolder.setInputFileCountMin(count);
- return this;
- }
- @Override
- public int getInputFileCountMin() {
- return configHolder.getInputFileCountMin();
- }
-
- @Override
- public HDFSStoreMutator setInputFileCountMax(int count) {
- configHolder.setInputFileCountMax(count);
- return this;
- }
- @Override
- public int getInputFileCountMax() {
- return configHolder.getInputFileCountMax();
- }
-
- @Override
- public HDFSStoreMutator setPurgeInterval(int interval) {
- configHolder.setPurgeInterval(interval);
- return this;
- }
- @Override
- public int getPurgeInterval() {
- return configHolder.getPurgeInterval();
- }
-
- @Override
- public int getBatchSize() {
- return configHolder.batchSize;
- }
- @Override
- public HDFSStoreMutator setBatchSize(int size) {
- configHolder.setBatchSize(size);
- return this;
- }
-
-
- @Override
- public int getBatchInterval() {
- return configHolder.batchIntervalMillis;
- }
- @Override
- public HDFSStoreMutator setBatchInterval(int interval) {
- configHolder.setBatchInterval(interval);
- return this;
- }
-
- public static void assertIsPositive(String name, int count) {
- if (count < 1) {
- throw new IllegalArgumentException(
- LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
- .toLocalizedString(new Object[] { name, count }));
- }
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("HDFSStoreMutatorImpl [");
- if (configHolder != null) {
- builder.append("configHolder=");
- builder.append(configHolder);
- builder.append(", ");
- }
- if (autoCompact != null) {
- builder.append("MinorCompaction=");
- builder.append(autoCompact);
- builder.append(", ");
- }
- if (getMajorCompaction() != null) {
- builder.append("autoMajorCompaction=");
- builder.append(getMajorCompaction());
- builder.append(", ");
- }
- builder.append("]");
- return builder.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
deleted file mode 100644
index 0298523..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
-import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-/**
- * Listener that persists events to a write only HDFS store
- *
- */
-public class HDFSWriteOnlyStoreEventListener implements
- AsyncEventListener {
-
- private final LogWriterI18n logger;
- private volatile boolean senderStopped = false;
- private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f);
-
-
- public HDFSWriteOnlyStoreEventListener(LogWriterI18n logger) {
- this.logger = logger;
- }
-
- @Override
- public void close() {
- senderStopped = true;
- }
-
- @Override
- public boolean processEvents(List<AsyncEvent> events) {
- if (Hoplog.NOP_WRITE) {
- return true;
- }
-
- if (logger.fineEnabled())
- logger.fine("HDFSWriteOnlyStoreEventListener: A total of " + events.size() + " events are sent from GemFire to persist on HDFS");
- boolean success = false;
- try {
- failureTracker.sleepIfRetry();
- HDFSGatewayEventImpl hdfsEvent = null;
- int previousBucketId = -1;
- BatchManager bm = null;
- for (AsyncEvent asyncEvent : events) {
- if (senderStopped){
- if (logger.fineEnabled()) {
- logger.fine("HDFSWriteOnlyStoreEventListener.processEvents: Cache is closing down. Ignoring the batch of data.");
- }
- return false;
- }
- hdfsEvent = (HDFSGatewayEventImpl)asyncEvent;
- if (previousBucketId != hdfsEvent.getBucketId()){
- if (previousBucketId != -1)
- persistBatch(bm, previousBucketId);
-
- previousBucketId = hdfsEvent.getBucketId();
- bm = new BatchManager();
- }
- bm.addEvent(hdfsEvent);
- }
- try {
- persistBatch(bm, hdfsEvent.getBucketId());
- } catch (BucketMovedException e) {
- logger.fine("Batch could not be written to HDFS as the bucket moved. bucket id: " +
- hdfsEvent.getBucketId() + " Exception: " + e);
- return false;
- }
- success = true;
- } catch (IOException e) {
- logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
- return false;
- }
- catch (ClassNotFoundException e) {
- logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
- return false;
- }
- catch (CacheClosedException e) {
- // exit silently
- if (logger.fineEnabled())
- logger.fine(e);
- return false;
- } catch (ForceReattemptException e) {
- if (logger.fineEnabled())
- logger.fine(e);
- return false;
- } catch (InterruptedException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- } finally {
- failureTracker.record(success);
- }
- return true;
- }
-
- /**
- * Persists batches of multiple regions specified by the batch manager
- *
- */
- private void persistBatch(BatchManager bm, int bucketId) throws IOException, ForceReattemptException {
- Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> eventsPerRegion =
- bm.iterator();
- HoplogOrganizer bucketOrganizer = null;
- while (eventsPerRegion.hasNext()) {
- Map.Entry<LocalRegion, ArrayList<QueuedPersistentEvent>> eventsForARegion = eventsPerRegion.next();
- bucketOrganizer = getOrganizer((PartitionedRegion) eventsForARegion.getKey(), bucketId);
- // bucket organizer cannot be null.
- if (bucketOrganizer == null)
- throw new BucketMovedException("Bucket moved. BucketID: " + bucketId + " HdfsRegion: " + eventsForARegion.getKey().getName());
- bucketOrganizer.flush(eventsForARegion.getValue().iterator(), eventsForARegion.getValue().size());
- if (logger.fineEnabled()) {
- logger.fine("Batch written to HDFS of size " + eventsForARegion.getValue().size() +
- " for region " + eventsForARegion.getKey());
- }
- }
- }
-
- private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) {
- BucketRegion br = region.getDataStore().getLocalBucketById(bucketId);
- if (br == null) {
- // got rebalanced or something
- throw new BucketMovedException("Bucket region is no longer available. BucketId: "+
- bucketId + " HdfsRegion: " + region.getName());
- }
-
- return br.getHoplogOrganizer();
- }
-
- /**
- * Sorts out events of the multiple regions into lists per region
- *
- */
- private class BatchManager implements Iterable<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> {
- private HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>> regionBatches =
- new HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>>();
-
- public void addEvent (HDFSGatewayEventImpl hdfsEvent) throws IOException, ClassNotFoundException {
- LocalRegion region = (LocalRegion) hdfsEvent.getRegion();
- ArrayList<QueuedPersistentEvent> regionList = regionBatches.get(region);
- if (regionList == null) {
- regionList = new ArrayList<QueuedPersistentEvent>();
- regionBatches.put(region, regionList);
- }
- regionList.add(new UnsortedHDFSQueuePersistedEvent(hdfsEvent));
- }
-
- @Override
- public Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> iterator() {
- return regionBatches.entrySet().iterator();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
deleted file mode 100644
index c7ba23f..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogListener;
-
-/**
- * Objects of this class needs to be created for every region. These objects
- * listen to the oplog events and take appropriate action.
- *
- */
-public class HoplogListenerForRegion implements HoplogListener {
-
- private List<HoplogListener> otherListeners = new CopyOnWriteArrayList<HoplogListener>();
-
- public HoplogListenerForRegion() {
-
- }
-
- @Override
- public void hoplogCreated(String regionFolder, int bucketId,
- Hoplog... oplogs) throws IOException {
- for (HoplogListener listener : this.otherListeners) {
- listener.hoplogCreated(regionFolder, bucketId, oplogs);
- }
- }
-
- @Override
- public void hoplogDeleted(String regionFolder, int bucketId,
- Hoplog... oplogs) {
- for (HoplogListener listener : this.otherListeners) {
- try {
- listener.hoplogDeleted(regionFolder, bucketId, oplogs);
- } catch (IOException e) {
- // TODO handle
- throw new HDFSIOException(e.getLocalizedMessage(), e);
- }
- }
- }
-
- public void addListener(HoplogListener listener) {
- this.otherListeners.add(listener);
- }
-
- @Override
- public void compactionCompleted(String region, int bucket, boolean isMajor) {
- for (HoplogListener listener : this.otherListeners) {
- listener.compactionCompleted(region, bucket, isMajor);
- }
- }
-}