You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2016/05/03 23:45:29 UTC
[15/60] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS
related code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
deleted file mode 100644
index 9127e4d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
+++ /dev/null
@@ -1,1232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.gemstone.gemfire.InternalGemFireError;
-import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.EntryNotFoundException;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.TimeoutException;
-import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.SortedEventBuffer.BufferIterator;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.Assert;
-import com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.RegionEventImpl;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.ByteComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.CursorIterator;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import org.apache.hadoop.hbase.util.Bytes;
-
-
-/**
- * This class holds the sorted list required for HDFS.
- *
- *
- */
-public class HDFSBucketRegionQueue extends AbstractBucketRegionQueue {
- private static final boolean VERBOSE = Boolean.getBoolean("hdfsBucketRegionQueue.VERBOSE");
- private final int batchSize;
- volatile HDFSEventQueue hdfsEventQueue = null;
-
- // set before releasing the primary lock.
- private final AtomicBoolean releasingPrimaryLock = new AtomicBoolean(true);
-
- // This is used to keep track of the current size of the queue in bytes.
- final AtomicLong queueSizeInBytes = new AtomicLong(0);
- public boolean isBucketSorted = true;
- /**
- * @param regionName
- * @param attrs
- * @param parentRegion
- * @param cache
- * @param internalRegionArgs
- */
- public HDFSBucketRegionQueue(String regionName, RegionAttributes attrs,
- LocalRegion parentRegion, GemFireCacheImpl cache,
- InternalRegionArguments internalRegionArgs) {
- super(regionName, attrs, parentRegion, cache, internalRegionArgs);
-
- this.isBucketSorted = internalRegionArgs.getPartitionedRegion().getParallelGatewaySender().getBucketSorted();
- if (isBucketSorted)
- hdfsEventQueue = new MultiRegionSortedQueue();
- else
- hdfsEventQueue = new EventQueue();
-
- batchSize = internalRegionArgs.getPartitionedRegion().
- getParallelGatewaySender().getBatchSize() * 1024 *1024;
- this.keySet();
- }
- @Override
- protected void initialize(InputStream snapshotInputStream,
- InternalDistributedMember imageTarget,
- InternalRegionArguments internalRegionArgs) throws TimeoutException,
- IOException, ClassNotFoundException {
-
- super.initialize(snapshotInputStream, imageTarget, internalRegionArgs);
-
- loadEventsFromTempQueue();
-
- this.initialized = true;
- notifyEventProcessor();
- }
-
- private TreeSet<Long> createSkipListFromMap(Set keySet) {
- TreeSet<Long> sortedKeys = null;
- if (!hdfsEventQueue.isEmpty())
- return sortedKeys;
-
- if (!keySet.isEmpty()) {
- sortedKeys = new TreeSet<Long>(keySet);
- if (!sortedKeys.isEmpty())
- {
- for (Long key : sortedKeys) {
- if (this.isBucketSorted) {
- Object hdfsevent = getNoLRU(key, true, false, false);
- if (hdfsevent == null) { // this can happen when tombstones are recovered.
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Discarding key " + key + ", no event recovered"));
- }
- } else {
- int eventSize = ((HDFSGatewayEventImpl)hdfsevent).
- getSizeOnHDFSInBytes(!this.isBucketSorted);
- hdfsEventQueue.put(key,(HDFSGatewayEventImpl)hdfsevent, eventSize );
- queueSizeInBytes.getAndAdd(eventSize);
- }
- }
- else {
- Object hdfsevent = getNoLRU(key, true, false, false);
- if (hdfsevent != null) { // hdfs event can be null when tombstones are recovered.
- queueSizeInBytes.getAndAdd(((HDFSGatewayEventImpl)hdfsevent).
- getSizeOnHDFSInBytes(!this.isBucketSorted));
- }
- ((EventQueue)hdfsEventQueue).put(key);
- }
-
- }
- getEventSeqNum().setIfGreater(sortedKeys.last());
- }
-
- }
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
- "For bucket " + getId() + ", total keys recovered are : " + keySet.size()
- + " and the seqNo is " + getEventSeqNum()));
- }
- return sortedKeys;
- }
-
- @Override
- protected void basicClear(RegionEventImpl ev) {
- super.basicClear(ev);
- queueSizeInBytes.set(0);
- if ( this.getBucketAdvisor().isPrimary()) {
- this.hdfsEventQueue.clear();
- }
- }
-
- protected void clearQueues(){
- queueSizeInBytes.set(0);
- if ( this.getBucketAdvisor().isPrimary()) {
- this.hdfsEventQueue.clear();
- }
- }
-
- @Override
- protected void basicDestroy(final EntryEventImpl event,
- final boolean cacheWrite, Object expectedOldValue)
- throws EntryNotFoundException, CacheWriterException, TimeoutException {
- super.basicDestroy(event, cacheWrite, expectedOldValue);
- }
-
- ArrayList peekABatch() {
- ArrayList result = new ArrayList();
- hdfsEventQueue.peek(result);
- return result;
- }
-
- @Override
- protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, int sizeOfHDFSEvent) {
- if (didPut && this.getBucketAdvisor().isPrimary()) {
- HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue();
- if (sizeOfHDFSEvent == -1) {
- try {
- // the size is calculated only on primary before event is inserted in the bucket.
- // If this node became primary after size was calculated, sizeOfHDFSEvent will be -1.
- // Try to get the size. #50016
- sizeOfHDFSEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted);
- } catch (Throwable e) {
- // Ignore any exception while fetching the size.
- sizeOfHDFSEvent = 0;
- }
- }
- queueSizeInBytes.getAndAdd(sizeOfHDFSEvent);
- if (this.initialized) {
- Long longKey = (Long)key;
- this.hdfsEventQueue.put(longKey, hdfsEvent, sizeOfHDFSEvent);
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Put successfully in the queue : " + hdfsEvent + " . Queue initialized: "
- + this.initialized);
- }
- }
- }
-
- /**
- * It removes the first key from the queue.
- *
- * @return Returns the key for which value was destroyed.
- * @throws ForceReattemptException
- */
- public Long remove() throws ForceReattemptException {
- throw new UnsupportedOperationException("Individual entries cannot be removed in a HDFSBucketRegionQueue");
- }
-
- /**
- * It removes the first key from the queue.
- *
- * @return Returns the value.
- * @throws InterruptedException
- * @throws ForceReattemptException
- */
- public Object take() throws InterruptedException, ForceReattemptException {
- throw new UnsupportedOperationException("take() cannot be called for individual entries in a HDFSBucketRegionQueue");
- }
-
- public void destroyKeys(ArrayList<HDFSGatewayEventImpl> listToDestroy) {
-
- HashSet<Long> removedSeqNums = new HashSet<Long>();
-
- for (int index =0; index < listToDestroy.size(); index++) {
- HDFSGatewayEventImpl entry = null;
- if (this.isBucketSorted) {
- // Remove the events in reverse order so that the events with higher sequence number
- // are removed last to ensure consistency.
- entry = listToDestroy.get(listToDestroy.size() - index -1);
- } else {
- entry = listToDestroy.get(index);
- }
-
- try {
- if (this.logger.isDebugEnabled())
- logger.debug("destroying primary key " + entry.getShadowKey() + " bucket id: " + this.getId());
- // removed from peeked list
- boolean deleted = this.hdfsEventQueue.remove(entry);
- if (deleted) {
- // this is an onheap event so a call to size should be ok.
- long entrySize = entry.getSizeOnHDFSInBytes(!this.isBucketSorted);
- destroyKey(entry.getShadowKey());
- long queueSize = queueSizeInBytes.getAndAdd(-1*entrySize);
- if (queueSize < 0) {
- // In HA scenarios, queueSizeInBytes can go awry.
- queueSizeInBytes.compareAndSet(queueSize, 0);
- }
- removedSeqNums.add(entry.getShadowKey());
- }
- }catch (ForceReattemptException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("ParallelGatewaySenderQueue#remove->HDFSBucketRegionQueue#destroyKeys: " + "Got ForceReattemptException for " + this
- + " for bucket = " + this.getId());
- }
- }
- catch(EntryNotFoundException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("ParallelGatewaySenderQueue#remove->HDFSBucketRegionQueue#destroyKeys: " + "Got EntryNotFoundException for " + this
- + " for bucket = " + this.getId() + " and key " + entry.getShadowKey());
- }
- } finally {
- entry.release();
- }
- }
-
- if (this.getBucketAdvisor().isPrimary()) {
- hdfsEventQueue.handleRemainingElements(removedSeqNums);
- }
- }
-
-
- public boolean isReadyForPeek() {
- return !this.isEmpty() && !this.hdfsEventQueue.isEmpty() && getBucketAdvisor().isPrimary();
- }
-
- public long getLastPeekTimeInMillis() {
- return hdfsEventQueue.getLastPeekTimeInMillis();
- }
-
- public long getQueueSizeInBytes() {
- return queueSizeInBytes.get();
- }
- /*
- * This function is called when the bucket takes as the role of primary.
- */
- @Override
- public void beforeAcquiringPrimaryState() {
-
- queueSizeInBytes.set(0);
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
- "This node has become primary for bucket " + this.getId() +". " +
- "Creating sorted data structure for the async queue."));
- }
- releasingPrimaryLock.set(false);
-
- // clear the hdfs queue in case it has already elements left if it was a primary
- // in the past
- hdfsEventQueue.clear();
- if (isBucketSorted)
- hdfsEventQueue = new MultiRegionSortedQueue();
- else
- hdfsEventQueue = new EventQueue();
-
- TreeSet<Long> sortedKeys = createSkipListFromMap(this.keySet());
-
- if (sortedKeys != null && sortedKeys.size() > 0) {
- // Mark the events equal to batch size as duplicate.
- // calculate the batch size based on the number of events currently in the queue
- // This is an approximation.
- long batchSizeMB = this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
- long batchSizeInBytes = batchSizeMB*1024*1024;
- long totalBucketSize = queueSizeInBytes.get();
- totalBucketSize = totalBucketSize > 0 ? totalBucketSize: 1;
- long totalEntriesInBucket = this.entryCount();
- totalEntriesInBucket = totalEntriesInBucket > 0 ? totalEntriesInBucket: 1;
-
- long perEntryApproxSize = totalBucketSize/totalEntriesInBucket;
- perEntryApproxSize = perEntryApproxSize > 0 ? perEntryApproxSize: 1;
-
- int batchSize = (int)(batchSizeInBytes/perEntryApproxSize);
-
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
- "Calculating batch size " + " batchSizeMB: " + batchSizeMB + " batchSizeInBytes: " + batchSizeInBytes +
- " totalBucketSize: " + totalBucketSize + " totalEntriesInBucket: " + totalEntriesInBucket +
- " perEntryApproxSize: " + perEntryApproxSize + " batchSize: " + batchSize ));
- }
-
- markEventsAsDuplicate(batchSize, sortedKeys.iterator());
- }
- }
-
- @Override
- public void beforeReleasingPrimaryLockDuringDemotion() {
- queueSizeInBytes.set(0);
- releasingPrimaryLock.set(true);
- // release memory in case of a clean transition
- hdfsEventQueue.clear();
- }
-
- /**
- * This function searches the skip list and the peeked skip list for a given region key
- * @param region
- *
- */
- public HDFSGatewayEventImpl getObjectForRegionKey(Region region, byte[] regionKey) {
- // get can only be called for a sorted queue.
- // Calling get with Long.MIN_VALUE seq number ensures that
- // the list will return the key which has highest seq number.
- return hdfsEventQueue.get(region, regionKey, Long.MIN_VALUE);
- }
-
- /**
- * Get an iterator on the queue, passing in the partitioned region
- * we want to iterate over the events from.
- */
- public SortedEventQueueIterator iterator(Region region) {
- return hdfsEventQueue.iterator(region);
- }
-
- public long totalEntries() {
- return entryCount();
- }
-
- /**
- * Ideally this function should be called from a thread periodically to
- * rollover the skip list when it is above a certain size.
- *
- */
- public void rolloverSkipList() {
- // rollover can only be called for a sorted queue.
- hdfsEventQueue.rollover();
- }
-
- public boolean shouldDrainImmediately() {
- return hdfsEventQueue.getFlushObserver().shouldDrainImmediately();
- }
-
- public AsyncFlushResult flush() {
- if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flush requested"));
- }
- return hdfsEventQueue.getFlushObserver().flush();
- }
-
- /**
- * This class keeps the regionkey and seqNum. The objects of this class are
- * kept in a concurrent skip list. The order of elements is decided based on the
- * comparison of regionKey + seqNum. This kind of comparison allows us to keep
- * multiple updates on a single key (becaus it has different seq Num)
- */
- static class KeyToSeqNumObject implements Comparable<KeyToSeqNumObject>
- {
- private byte[] regionkey;
- private Long seqNum;
-
- KeyToSeqNumObject(byte[] regionkey, Long seqNum){
- this.regionkey = regionkey;
- this.seqNum = seqNum;
- }
-
- /**
- * This function compares the key first. If the keys are same then seq num is compared.
- * This function is a key function because it ensures that the skiplists hold the elements
- * in an order we want it to and for multiple updates on key fetches the most recent one
- * Currently we are comparing seq numbers but we will have to change it to version stamps.
- * * List can have elements in following sequence
- * K1 Value1 version : 1
- * K2 Value2a version : 2
- * K2 Value2 version : 1
- * K3 Value3 version : 1
- * For a get on K2, it should retunr K2 Value 2a.
- */
- @Override
- public int compareTo(KeyToSeqNumObject o) {
- int compareOutput = ByteComparator.compareBytes(
- this.getRegionkey(), 0, this.getRegionkey().length, o.getRegionkey(), 0, o.getRegionkey().length);
- if (compareOutput != 0 )
- return compareOutput;
-
- // If the keys are same and this is an object with dummy seq number,
- // return -1. This will ensure that ceiling function on a skip list will enumerate
- // all the entries and return the last one.
- if (this.getSeqNum() == Long.MIN_VALUE)
- return -1;
-
- // this is to just maintain consistency with the above statement.
- if (o.getSeqNum() == Long.MIN_VALUE)
- return 1;
-
- // minus operator pushes entries with lower seq number in the end so that
- // the order as mentioned above is maintained. And the entries with
- // higher version are fetched on a get.
- return this.getSeqNum().compareTo(o.getSeqNum()) * -1;
- }
-
- @Override
- public boolean equals (Object o) {
- KeyToSeqNumObject obj = null;
- if (o == null)
- return false;
-
- if (o instanceof KeyToSeqNumObject)
- obj = (KeyToSeqNumObject)o;
- else
- return false;
-
- if (this.compareTo(obj) != 0)
- return false;
- else
- return true;
- }
-
- public int hashCode() {
- assert false : "hashCode not designed";
- return -1;
- }
-
- byte[] getRegionkey() {
- return regionkey;
- }
-
- public Long getSeqNum() {
- return seqNum;
- }
-
- public void setSeqNum(Long seqNum) {
- this.seqNum = seqNum;
- }
-
- @Override
- public String toString() {
- return EntryEventImpl.deserialize(regionkey) + " {" + seqNum + "}";
- }
- }
-
- public interface HDFSEventQueue {
- FlushObserver getFlushObserver();
-
- /** puts an event in the queue. */
- public void put (long key, HDFSGatewayEventImpl event, int size);
-
- public SortedEventQueueIterator iterator(Region region);
-
- public void rollover();
-
- /** Get a value from the queue
- * @throws IllegalStateException if this queue doesn't support get
- **/
- public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
- long minValue);
-
- // Peeks a batch of size specified by batchSize
- // And add the results to the array list
- public void peek(ArrayList result);
-
- // Checks if there are elements to bee peeked
- public boolean isEmpty();
-
- // removes the event if it has already been peeked.
- public boolean remove(HDFSGatewayEventImpl event);
-
- // take care of the elements that were peeked
- // but were not removed after a batch dispatch
- // due to concurrency effects.
- public void handleRemainingElements(HashSet<Long> listToBeremoved);
-
- // clears the list.
- public void clear();
-
- // get the time when the last peek was done.
- public long getLastPeekTimeInMillis();
- }
-
- class MultiRegionSortedQueue implements HDFSEventQueue {
- ConcurrentMap<String, SortedEventQueue> regionToEventQueue = new ConcurrentHashMap<String, SortedEventQueue>();
- volatile Set<SortedEventQueue> peekedQueues = Collections.EMPTY_SET;
- private final AtomicBoolean peeking = new AtomicBoolean(false);
- long lastPeekTimeInMillis = System.currentTimeMillis();
-
- private final FlushObserver flush = new FlushObserver() {
- @Override
- public AsyncFlushResult flush() {
- final Set<AsyncFlushResult> flushes = new HashSet<AsyncFlushResult>();
- for (SortedEventQueue queue : regionToEventQueue.values()) {
- flushes.add(queue.getFlushObserver().flush());
- }
-
- return new AsyncFlushResult() {
- @Override
- public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException {
- long start = System.nanoTime();
- long remaining = unit.toNanos(timeout);
- for (AsyncFlushResult afr : flushes) {
- if (!afr.waitForFlush(remaining, TimeUnit.NANOSECONDS)) {
- return false;
- }
- remaining -= (System.nanoTime() - start);
- }
- return true;
- }
- };
- }
-
- @Override
- public boolean shouldDrainImmediately() {
- for (SortedEventQueue queue : regionToEventQueue.values()) {
- if (queue.getFlushObserver().shouldDrainImmediately()) {
- return true;
- }
- }
- return false;
- }
- };
-
- @Override
- public FlushObserver getFlushObserver() {
- return flush;
- }
-
- @Override
- public void put(long key, HDFSGatewayEventImpl event, int size) {
-
- String region = event.getRegionPath();
- SortedEventQueue regionQueue = regionToEventQueue.get(region);
- if(regionQueue == null) {
- regionToEventQueue.putIfAbsent(region, new SortedEventQueue());
- regionQueue = regionToEventQueue.get(region);
- }
- regionQueue.put(key, event, size);
- }
-
- @Override
- public void peek(ArrayList result) {
- // The elements that were peeked last time, have not been persisted to HDFS
- // yet. You cannot take out next batch until that is done.
- if (!peeking.compareAndSet(false, true)) {
- if (logger.isTraceEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peek already in progress, aborting"));
- }
- return;
- }
- //Maintain a separate set of peeked queues.
- //All of these queues are statefull, and expect to be
- //handleRemainingElements and clear to be called on
- //them iff peek was called on them. However, new queues
- //may be created in that time.
- peekedQueues = Collections.newSetFromMap(new ConcurrentHashMap<SortedEventQueue, Boolean>(regionToEventQueue.size()));
-
- //Peek from all of the existing queues
- for(SortedEventQueue queue : regionToEventQueue.values()) {
- if(!queue.isEmpty()) {
- queue.peek(result);
- peekedQueues.add(queue);
- }
- }
- if (result.isEmpty())
- peeking.set(false);
-
-
- this.lastPeekTimeInMillis = System.currentTimeMillis();
- }
-
- @Override
- public boolean isEmpty() {
- for(SortedEventQueue queue : regionToEventQueue.values()) {
- if(!queue.isEmpty()) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public boolean remove(HDFSGatewayEventImpl event) {
- String region = event.getRegionPath();
- SortedEventQueue regionQueue = regionToEventQueue.get(region);
- return regionQueue.remove(event);
- }
-
- @Override
- public void handleRemainingElements(HashSet<Long> removedSeqNums){
- for(SortedEventQueue queue : peekedQueues) {
- queue.handleRemainingElements(removedSeqNums);
- }
- peekedQueues.clear();
- peeking.set(false);
- }
-
- @Override
- public void clear() {
- for(SortedEventQueue queue : regionToEventQueue.values()) {
- queue.clear();
- }
- peekedQueues.clear();
- peeking.set(false);
- }
-
- @Override
- public long getLastPeekTimeInMillis() {
- return this.lastPeekTimeInMillis;
- }
-
- @Override
- public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
- long minValue) {
- SortedEventQueue queue = regionToEventQueue.get(region.getFullPath());
- if(queue == null) {
- return null;
- }
- return queue.get(region, regionKey, minValue);
- }
-
- @Override
- public SortedEventQueueIterator iterator(Region region) {
- SortedEventQueue queue = regionToEventQueue.get(region.getFullPath());
- if(queue == null) {
- return new SortedEventQueueIterator(new LinkedBlockingDeque<SortedEventBuffer>());
- }
- return queue.iterator(region);
- }
-
- @Override
- public void rollover() {
- for(SortedEventQueue queue : regionToEventQueue.values()) {
- queue.rollover();
- }
- }
- }
-
- class EventQueue implements HDFSEventQueue {
- private final SignalledFlushObserver flush = new SignalledFlushObserver();
- private final BlockingQueue<Long> eventSeqNumQueue = new LinkedBlockingQueue<Long>();
- private final BlockingQueue<Long> peekedEvents = new LinkedBlockingQueue<Long>();
- private long lastPeekTimeInMillis = System.currentTimeMillis();
-
- public EventQueue() {
-
- }
-
- @Override
- public FlushObserver getFlushObserver() {
- return flush;
- }
-
- @Override
- public void put(long key, HDFSGatewayEventImpl event, int size) {
- put(key);
- }
- public void put (long key) {
- eventSeqNumQueue.add(key);
- flush.push();
- incQueueSize();
- }
-
-
- @Override
- public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
- long minValue) {
- throw new InternalGemFireError("Get not supported on unsorted queue");
- }
-
- @Override
- public void peek(ArrayList peekedEntries) {
- if (peekedEvents.size() != 0) {
- return;
- }
-
- for(int size=0; size < batchSize; ) {
- Long seqNum = eventSeqNumQueue.peek();
- if (seqNum == null) {
- // queue is now empty, return
- break;
- }
- Object object = getNoLRU(seqNum, true, false, false);
- if (object != null) {
- peekedEvents.add(seqNum);
- size += ((HDFSGatewayEventImpl)object).getSizeOnHDFSInBytes(!isBucketSorted);
- peekedEntries.add(object);
-
- } else {
- logger.debug("The entry corresponding to the sequence number " +
- seqNum + " is missing. This can happen when an entry is already" +
- "dispatched before a bucket moved.");
- // event is being ignored. Decrease the queue size
- decQueueSize();
- flush.pop(1);
-
- }
- eventSeqNumQueue.poll();
-
- }
- this.lastPeekTimeInMillis = System.currentTimeMillis();
- }
-
- @Override
- public boolean isEmpty() {
- return eventSeqNumQueue.isEmpty();
- }
-
-
- @Override
- public boolean remove(HDFSGatewayEventImpl event) {
- boolean deleted = peekedEvents.remove(event.getShadowKey());
- if (deleted)
- decQueueSize();
- return deleted;
- }
-
- @Override
- // It looks like that there is no need for this function
- // in EventQueue.
- public void handleRemainingElements(HashSet<Long> removedSeqNums) {
- flush.pop(removedSeqNums.size());
- eventSeqNumQueue.addAll(peekedEvents);
- peekedEvents.clear();
- }
-
- @Override
- public void clear() {
- flush.clear();
- decQueueSize(eventSeqNumQueue.size());
- eventSeqNumQueue.clear();
- decQueueSize(peekedEvents.size());
- peekedEvents.clear();
- }
-
- @Override
- public long getLastPeekTimeInMillis() {
- return this.lastPeekTimeInMillis;
- }
- @Override
- public SortedEventQueueIterator iterator(Region region) {
- throw new InternalGemFireError("not supported on unsorted queue");
- }
- @Override
- public void rollover() {
- throw new InternalGemFireError("not supported on unsorted queue");
- }
- }
-
- class SortedEventQueue implements HDFSEventQueue {
- private final SignalledFlushObserver flush = new SignalledFlushObserver();
-
- // List of all the skip lists that hold the data
- final Deque<SortedEventBuffer> queueOfLists =
- new LinkedBlockingDeque<SortedEventBuffer>();
-
- // This points to the tail of the queue
- volatile SortedEventBuffer currentSkipList = new SortedEventBuffer();
-
- private final AtomicBoolean peeking = new AtomicBoolean(false);
-
- private long lastPeekTimeInMillis = System.currentTimeMillis();
-
- public SortedEventQueue() {
- queueOfLists.add(currentSkipList);
- }
-
- @Override
- public FlushObserver getFlushObserver() {
- return flush;
- }
-
- public boolean remove(HDFSGatewayEventImpl event) {
- SortedEventBuffer eventBuffer = queueOfLists.peek();
- if (eventBuffer != null) {
- return eventBuffer.copyToBuffer(event);
- }
- else {
- // This can happen when the queue is cleared because of bucket movement
- // before the remove is called.
- return true;
- }
- }
-
- public void clear() {
- flush.clear();
- for (SortedEventBuffer buf : queueOfLists) {
- decQueueSize(buf.size());
- buf.clear();
- }
-
- queueOfLists.clear();
- rollList(false);
-
- peeking.set(false);
- }
-
- public boolean isEmpty() {
- if (queueOfLists.size() == 1)
- return queueOfLists.peek().isEmpty();
- return false;
- }
-
- public void put(long key, HDFSGatewayEventImpl event, int eventSize) {
- if (logger.isTraceEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Inserting key " + event + " into list " + System.identityHashCode(currentSkipList)));
- }
- putInList(new KeyToSeqNumObject(((HDFSGatewayEventImpl)event).getSerializedKey(), key),
- eventSize);
- }
-
- private void putInList(KeyToSeqNumObject entry, int sizeInBytes) {
- // It was observed during testing that peek can start peeking
- // elements from a list to which a put is happening. This happens
- // when the peek changes the value of currentSkiplist to a new list
- // but the put continues to write to an older list.
- // So there is a possibility that an element is added to the list
- // that has already been peeked. To handle this case, in handleRemainingElements
- // function we re-add the elements that were not peeked.
- if (currentSkipList.add(entry, sizeInBytes) == null) {
- flush.push();
- incQueueSize();
- }
- }
-
- public void rollover(boolean forceRollover) {
- if (currentSkipList.bufferSize() >= batchSize || forceRollover) {
- rollList(forceRollover);
- }
- }
-
- /**
- * Ideally this function should be called from a thread periodically to
- * rollover the skip list when it is above a certain size.
- *
- */
- public void rollover() {
- rollover(false);
- }
-
- public void peek(ArrayList peekedEntries) {
- // The elements that were peeked last time, have not been persisted to HDFS
- // yet. You cannot take out next batch until that is done.
- if (!peeking.compareAndSet(false, true)) {
- if (logger.isTraceEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peek already in progress, aborting"));
- }
- return;
- }
-
- if (queueOfLists.size() == 1) {
- rollList(false);
- }
-
- Assert.assertTrue(queueOfLists.size() > 1, "Cannot peek from head of queue");
- BufferIterator itr = queueOfLists.peek().iterator();
- while (itr.hasNext()) {
- KeyToSeqNumObject entry = itr.next();
- if (logger.isTraceEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peeking key " + entry + " from list " + System.identityHashCode(queueOfLists.peek())));
- }
-
- HDFSGatewayEventImpl ev = itr.value();
- ev.copyOffHeapValue();
- peekedEntries.add(ev);
- }
-
- // discard an empty batch as it is not processed and will plug up the
- // queue
- if (peekedEntries.isEmpty()) {
- SortedEventBuffer empty = queueOfLists.remove();
- if (logger.isTraceEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Discarding empty batch " + empty));
- }
- peeking.set(false);
- }
- this.lastPeekTimeInMillis = System.currentTimeMillis();
- }
-
- public HDFSGatewayEventImpl get(Region region, byte[] regionKey, long key) {
- KeyToSeqNumObject event = new KeyToSeqNumObject(regionKey, key);
- Iterator<SortedEventBuffer> queueIterator = queueOfLists.descendingIterator();
- while (queueIterator.hasNext()) {
- HDFSGatewayEventImpl evt = queueIterator.next().getFromQueueOrBuffer(event);
- if (evt != null) {
- return evt;
- }
- }
- return null;
- }
-
- public void handleRemainingElements(HashSet<Long> removedSeqNums) {
- if (!peeking.get()) {
- if (logger.isTraceEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Not peeked, just cleaning up empty batch; current list is " + currentSkipList));
- }
- return;
- }
-
- Assert.assertTrue(queueOfLists.size() > 1, "Cannot remove only event list");
-
- // all done with the peeked elements, okay to throw away now
- SortedEventBuffer buf = queueOfLists.remove();
- SortedEventBuffer.BufferIterator bufIter = buf.iterator();
- // Check if the removed buffer has any extra events. If yes, check if these extra
- // events are part of region. If yes, reinsert these as they were probably inserted
- // into this list while it was being peeked.
- while (bufIter.hasNext()) {
- KeyToSeqNumObject key = bufIter.next();
- if (!removedSeqNums.contains(key.getSeqNum())) {
- HDFSGatewayEventImpl evt = (HDFSGatewayEventImpl) getNoLRU(key.getSeqNum(), true, false, false);
- if (evt != null) {
- flush.push();
- incQueueSize();
- queueOfLists.getFirst().add(key, evt.getSizeOnHDFSInBytes(!isBucketSorted));
- }
- }
- }
-
- decQueueSize(buf.size());
- flush.pop(buf.size());
- peeking.set(false);
- }
-
- public long getLastPeekTimeInMillis(){
- return this.lastPeekTimeInMillis;
- }
-
- NavigableSet<KeyToSeqNumObject> getPeeked() {
- assert peeking.get();
- return queueOfLists.peek().keySet();
- }
-
- private synchronized void rollList(boolean forceRollover) {
- if (currentSkipList.bufferSize() < batchSize && queueOfLists.size() > 1 && !forceRollover)
- return;
- SortedEventBuffer tmp = new SortedEventBuffer();
- queueOfLists.add(tmp);
- if (logger.isTraceEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Rolling over list from " + currentSkipList + " to list " + tmp));
- }
- currentSkipList = tmp;
- }
-
- @Override
- public SortedEventQueueIterator iterator(Region region) {
- return new SortedEventQueueIterator(queueOfLists);
- }
- }
-
- public class SortedEventBuffer {
- private final HDFSGatewayEventImpl NULL = new HDFSGatewayEventImpl();
-
- private final ConcurrentSkipListMap<KeyToSeqNumObject, HDFSGatewayEventImpl> events = new ConcurrentSkipListMap<KeyToSeqNumObject, HDFSGatewayEventImpl>();
-
- private int bufferSize = 0;
-
- public boolean copyToBuffer(HDFSGatewayEventImpl event) {
- KeyToSeqNumObject key = new KeyToSeqNumObject(event.getSerializedKey(), event.getShadowKey());
- if (events.containsKey(key)) {
- // After an event has been delivered in a batch, we copy it into the
- // buffer so that it can be returned by an already in progress iterator.
- // If we do not do this it is possible to miss events since the hoplog
- // iterator uses a fixed set of files that are determined when the
- // iterator is created. The events will be GC'd once the buffer is no
- // longer strongly referenced.
- HDFSGatewayEventImpl oldVal = events.put(key, event);
- assert oldVal == NULL;
-
- return true;
- }
- // If the primary lock is being relinquished, the events is cleared and probaly that is
- // why we are here. return true if the primary lock is being relinquished
- if (releasingPrimaryLock.get())
- return true;
- else
- return false;
- }
-
- public HDFSGatewayEventImpl getFromQueueOrBuffer(KeyToSeqNumObject key) {
- KeyToSeqNumObject result = events.ceilingKey(key);
- if (result != null && Bytes.compareTo(key.getRegionkey(), result.getRegionkey()) == 0) {
-
- // first try to fetch the buffered event to make it fast.
- HDFSGatewayEventImpl evt = events.get(result);
- if (evt != NULL) {
- return evt;
- }
- // now try to fetch the event from the queue region
- evt = (HDFSGatewayEventImpl) getNoLRU(result.getSeqNum(), true, false, false);
- if (evt != null) {
- return evt;
- }
-
- // try to fetch again from the buffered events to avoid a race between
- // item deletion and the above two statements.
- evt = events.get(result);
- if (evt != NULL) {
- return evt;
- }
-
- }
- return null;
- }
-
- public HDFSGatewayEventImpl add(KeyToSeqNumObject key, int sizeInBytes) {
- bufferSize += sizeInBytes;
- return events.put(key, NULL);
- }
-
- public void clear() {
- events.clear();
- }
-
- public boolean isEmpty() {
- return events.isEmpty();
- }
-
- public int bufferSize() {
- return bufferSize;
- }
- public int size() {
- return events.size();
- }
- public NavigableSet<KeyToSeqNumObject> keySet() {
- return events.keySet();
- }
-
- public BufferIterator iterator() {
- return new BufferIterator(events.keySet().iterator());
- }
-
- public class BufferIterator implements Iterator<KeyToSeqNumObject> {
- private final Iterator<KeyToSeqNumObject> src;
-
- private KeyToSeqNumObject currentKey;
- private HDFSGatewayEventImpl currentVal;
-
- private KeyToSeqNumObject nextKey;
- private HDFSGatewayEventImpl nextVal;
-
- public BufferIterator(Iterator<KeyToSeqNumObject> src) {
- this.src = src;
- moveNext();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean hasNext() {
- return nextVal != null;
- }
-
- @Override
- public KeyToSeqNumObject next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- currentKey = nextKey;
- currentVal = nextVal;
-
- moveNext();
-
- return currentKey;
- }
-
- public KeyToSeqNumObject key() {
- assert currentKey != null;
- return currentKey;
- }
-
- public HDFSGatewayEventImpl value() {
- assert currentVal != null;
- return currentVal;
- }
-
- private void moveNext() {
- while (src.hasNext()) {
- nextKey = src.next();
- nextVal = getFromQueueOrBuffer(nextKey);
- if (nextVal != null) {
- return;
- } else if (logger.isDebugEnabled() || VERBOSE) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "The entry corresponding to"
- + " the sequence number " + nextKey.getSeqNum()
- + " is missing. This can happen when an entry is already"
- + " dispatched before a bucket moved."));
- }
- }
- nextKey = null;
- nextVal = null;
- }
- }
- }
-
- public final class SortedEventQueueIterator implements CursorIterator<HDFSGatewayEventImpl> {
- /** the iterators to merge */
- private final List<SortedEventBuffer.BufferIterator> iters;
-
- /** the current iteration value */
- private HDFSGatewayEventImpl value;
-
- public SortedEventQueueIterator(Deque<SortedEventBuffer> queueOfLists) {
- iters = new ArrayList<SortedEventBuffer.BufferIterator>();
- for (Iterator<SortedEventBuffer> iter = queueOfLists.descendingIterator(); iter.hasNext();) {
- SortedEventBuffer.BufferIterator buf = iter.next().iterator();
- if (buf.hasNext()) {
- buf.next();
- iters.add(buf);
- }
- }
- }
-
- public void close() {
- value = null;
- iters.clear();
- }
-
- @Override
- public boolean hasNext() {
- return !iters.isEmpty();
- }
-
- @Override
- public HDFSGatewayEventImpl next() {
- if (!hasNext()) {
- throw new UnsupportedOperationException();
- }
-
- int diff = 0;
- KeyToSeqNumObject min = null;
- SortedEventBuffer.BufferIterator cursor = null;
-
- for (Iterator<SortedEventBuffer.BufferIterator> merge = iters.iterator(); merge.hasNext(); ) {
- SortedEventBuffer.BufferIterator buf = merge.next();
- KeyToSeqNumObject tmp = buf.key();
- if (min == null || (diff = Bytes.compareTo(tmp.regionkey, min.regionkey)) < 0) {
- min = tmp;
- cursor = buf;
-
- } else if (diff == 0 && !advance(buf, min)) {
- merge.remove();
- }
- }
-
- value = cursor.value();
- assert value != null;
-
- if (!advance(cursor, min)) {
- iters.remove(cursor);
- }
- return current();
- }
-
- @Override
- public final HDFSGatewayEventImpl current() {
- return value;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- private boolean advance(SortedEventBuffer.BufferIterator iter, KeyToSeqNumObject key) {
- while (iter.hasNext()) {
- if (Bytes.compareTo(iter.next().regionkey, key.regionkey) > 0) {
- return true;
- }
- }
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
deleted file mode 100644
index c8b7b28..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.IOException;
-import java.lang.ref.ReferenceQueue;
-import java.lang.ref.WeakReference;
-import java.util.AbstractSet;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.SortedEventQueueIterator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.HDFSRegionMap;
-import com.gemstone.gemfire.internal.cache.KeyWithRegionContext;
-import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
-import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import org.apache.hadoop.hbase.util.Bytes;
-
-@SuppressWarnings("rawtypes")
-public class HDFSEntriesSet extends AbstractSet {
- private final IteratorType type;
-
- private final HoplogOrganizer hoplogs;
- private final HDFSBucketRegionQueue brq;
-
- private final BucketRegion region;
- private final ReferenceQueue<HDFSIterator> refs;
-
- public HDFSEntriesSet(BucketRegion region, HDFSBucketRegionQueue brq,
- HoplogOrganizer hoplogs, IteratorType type, ReferenceQueue<HDFSIterator> refs) {
- this.region = region;
- this.brq = brq;
- this.hoplogs = hoplogs;
- this.type = type;
- this.refs = refs;
- }
-
- @Override
- public HDFSIterator iterator() {
- HDFSIterator iter = new HDFSIterator(type, region.getPartitionedRegion(), true);
- if (refs != null) {
- // we can't rely on an explicit close but we need to free resources
- //
- // This approach has the potential to cause excessive memory load and/or
- // GC problems if an app holds an iterator ref too long. A lease-based
- // approach where iterators are automatically for X secs of inactivity is
- // a potential alternative (but may require tuning for certain
- // applications)
- new WeakReference<HDFSEntriesSet.HDFSIterator>(iter, refs);
- }
- return iter;
- }
-
- @Override
- public int size() {
- // TODO this is the tortoise version, need a fast version for estimation
- // note: more than 2^31-1 records will cause this counter to wrap
- int size = 0;
- HDFSIterator iter = new HDFSIterator(null, region.getPartitionedRegion(), false);
- try {
- while (iter.hasNext()) {
- if (includeEntry(iter.next())) {
- size++;
- }
- }
- } finally {
- iter.close();
- }
- return size;
- }
-
- @Override
- public boolean isEmpty() {
- HDFSIterator iter = new HDFSIterator(null, region.getPartitionedRegion(), false);
- try {
- while (iter.hasNext()) {
- if (includeEntry(iter.next())) {
- return false;
- }
- }
- } finally {
- iter.close();
- }
- return true;
- }
-
- private boolean includeEntry(Object val) {
- if (val instanceof HDFSGatewayEventImpl) {
- HDFSGatewayEventImpl evt = (HDFSGatewayEventImpl) val;
- if (evt.getOperation().isDestroy()) {
- return false;
- }
- } else if (val instanceof PersistedEventImpl) {
- PersistedEventImpl evt = (PersistedEventImpl) val;
- if (evt.getOperation().isDestroy()) {
- return false;
- }
- }
- return true;
- }
-
- public class HDFSIterator implements Iterator {
- private final IteratorType type;
- private final boolean deserialize;
-
- private final SortedEventQueueIterator queue;
- private final HoplogIterator<byte[], SortedHoplogPersistedEvent> hdfs;
- private Iterator txCreatedEntryIterator;
-
- private boolean queueNext;
- private boolean hdfsNext;
- private boolean forUpdate;
- private boolean hasTxEntry;
-
- private byte[] currentHdfsKey;
-
- public HDFSIterator(IteratorType type, Region region, boolean deserialize) {
- this.type = type;
- this.deserialize = deserialize;
-
- // Check whether the queue has become primary here.
- // There could be some time between bucket becoming a primary
- // and underlying queue becoming a primary, so isPrimaryWithWait()
- // waits for some time for the queue to become a primary on this member
- if (!brq.getBucketAdvisor().isPrimaryWithWait()) {
- InternalDistributedMember primaryHolder = brq.getBucketAdvisor()
- .basicGetPrimaryMember();
- throw new PrimaryBucketException("Bucket " + brq.getName()
- + " is not primary. Current primary holder is " + primaryHolder);
- }
- // We are deliberating NOT sync'ing while creating the iterators. If done
- // in the correct order, we may get duplicates (due to an in-progress
- // flush) but we won't miss any entries. The dupes will be eliminated
- // during iteration.
- queue = brq.iterator(region);
- advanceQueue();
-
- HoplogIterator<byte[], SortedHoplogPersistedEvent> tmp = null;
- try {
- tmp = hoplogs.scan();
- } catch (IOException e) {
- HDFSEntriesSet.this.region.checkForPrimary();
- throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(e.getMessage()), e);
- }
-
- hdfs = tmp;
- if (hdfs != null) {
- advanceHdfs();
- }
- }
-
- @Override
- public boolean hasNext() {
- boolean nonTxHasNext = hdfsNext || queueNext;
- if (!nonTxHasNext && this.txCreatedEntryIterator != null) {
- this.hasTxEntry = this.txCreatedEntryIterator.hasNext();
- return this.hasTxEntry;
- }
- return nonTxHasNext;
- }
-
- @Override
- public Object next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- if (hasTxEntry) {
- hasTxEntry = false;
- return this.txCreatedEntryIterator.next();
- }
-
- Object val;
- if (!queueNext) {
- val = getFromHdfs();
- advanceHdfs();
-
- } else if (!hdfsNext) {
- val = getFromQueue();
- advanceQueue();
-
- } else {
- byte[] qKey = queue.current().getSerializedKey();
- byte[] hKey = this.currentHdfsKey;
-
- int diff = Bytes.compareTo(qKey, hKey);
- if (diff < 0) {
- val = getFromQueue();
- advanceQueue();
-
- } else if (diff == 0) {
- val = getFromQueue();
- advanceQueue();
-
- // ignore the duplicate
- advanceHdfs();
-
- } else {
- val = getFromHdfs();
- advanceHdfs();
- }
- }
- return val;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- public void close() {
- if (queueNext) {
- queue.close();
- }
-
- if (hdfsNext) {
- hdfs.close();
- }
- }
-
- private Object getFromQueue() {
- HDFSGatewayEventImpl evt = queue.current();
- if (type == null) {
- return evt;
- }
-
- switch (type) {
- case KEYS:
- byte[] key = evt.getSerializedKey();
- return deserialize ? EntryEventImpl.deserialize(key) : key;
-
- case VALUES:
- return evt.getValue();
-
- default:
- Object keyObj = EntryEventImpl.deserialize(evt.getSerializedKey());
- if(keyObj instanceof KeyWithRegionContext) {
- ((KeyWithRegionContext)keyObj).setRegionContext(region.getPartitionedRegion());
- }
- return ((HDFSRegionMap) region.getRegionMap()).getDelegate().getEntryFromEvent(keyObj, evt, true, forUpdate);
- }
- }
-
- private Object getFromHdfs() {
- if (type == null) {
- return hdfs.getValue();
- }
-
- switch (type) {
- case KEYS:
- byte[] key = this.currentHdfsKey;
- return deserialize ? EntryEventImpl.deserialize(key) : key;
-
- case VALUES:
- PersistedEventImpl evt = hdfs.getValue();
- return evt.getValue();
-
- default:
- Object keyObj = EntryEventImpl.deserialize(this.currentHdfsKey);
- if(keyObj instanceof KeyWithRegionContext) {
- ((KeyWithRegionContext)keyObj).setRegionContext(region.getPartitionedRegion());
- }
- return ((HDFSRegionMap) region.getRegionMap()).getDelegate().getEntryFromEvent(keyObj, hdfs.getValue(), true, forUpdate);
- }
- }
-
- private void advanceHdfs() {
- if (hdfsNext = hdfs.hasNext()) {
- try {
- this.currentHdfsKey = hdfs.next();
- } catch (IOException e) {
- region.checkForPrimary();
- throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(e.getMessage()), e);
- }
- } else {
- this.currentHdfsKey = null;
- hdfs.close();
- }
- }
-
- private void advanceQueue() {
- if (queueNext = queue.hasNext()) {
- queue.next();
- } else {
- brq.checkForPrimary();
- queue.close();
- }
- }
-
- public void setForUpdate(){
- this.forUpdate = true;
- }
-
- /**MergeGemXDHDFSToGFE not sure of this function is required */
- /*public void setTXState(TXState txState) {
- TXRegionState txr = txState.getTXRegionState(region);
- if (txr != null) {
- txr.lock();
- try {
- this.txCreatedEntryIterator = txr.getCreatedEntryKeys().iterator();
- }
- finally{
- txr.unlock();
- }
- }
- }*/
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
deleted file mode 100644
index 607650f..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.i18n.StringId;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
-import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-
-/**
- * Listener that persists events to HDFS
- *
- */
-public class HDFSEventListener implements AsyncEventListener {
- private final LogWriterI18n logger;
- private volatile boolean senderStopped = false;
-
- private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f);
-
- public HDFSEventListener(LogWriterI18n logger) {
- this.logger = logger;
- }
-
- @Override
- public void close() {
- senderStopped = true;
- }
-
- @Override
- public boolean processEvents(List<AsyncEvent> events) {
- if (Hoplog.NOP_WRITE) {
- return true;
- }
-
- // The list of events that async queue receives are sorted at the
- // bucket level. Events for multiple regions are concatenated together.
- // Events for multiple buckets are sent which are concatenated
- // one after the other for e.g.
- //
- // <Region1, Key1, bucket1>, <Region1, Key19, bucket1>,
- // <Region1, Key4, bucket2>, <Region1, Key6, bucket2>
- // <Region2, Key1, bucket1>, <Region2, Key4, bucket1>
- // ..
-
- Region previousRegion = null;
- int prevBucketId = -1;
- ArrayList<QueuedPersistentEvent> list = null;
- boolean success = false;
- try {
- //Back off if we are experiencing failures
- failureTracker.sleepIfRetry();
-
- HoplogOrganizer bucketOrganizer = null;
- for (AsyncEvent asyncEvent : events) {
- if (senderStopped){
- failureTracker.failure();
- if (logger.fineEnabled()) {
- logger.fine("HDFSEventListener.processEvents: Cache is closing down. Ignoring the batch of data.");
- }
- return false;
- }
- HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)asyncEvent;
- Region region = hdfsEvent.getRegion();
-
- if (prevBucketId != hdfsEvent.getBucketId() || region != previousRegion){
- if (prevBucketId != -1) {
- bucketOrganizer.flush(list.iterator(), list.size());
- success=true;
- if (logger.fineEnabled()) {
- logger.fine("Batch written to HDFS of size " + list.size() + " for region " + previousRegion);
- }
- }
- bucketOrganizer = getOrganizer((PartitionedRegion) region, hdfsEvent.getBucketId());
- // Bucket organizer can be null only when the bucket has moved. throw an exception so that the
- // batch is discarded.
- if (bucketOrganizer == null)
- throw new BucketMovedException("Bucket moved. BucketId: " + hdfsEvent.getBucketId() + " HDFSRegion: " + region.getName());
- list = new ArrayList<QueuedPersistentEvent>();
- }
- try {
- //TODO:HDFS check if there is any serialization overhead
- list.add(new SortedHDFSQueuePersistedEvent(hdfsEvent));
- } catch (ClassNotFoundException e) {
- //TODO:HDFS add localized string
- logger.warning(new StringId(0, "Error while converting HDFSGatewayEvent to PersistedEventImpl."), e);
- return false;
- }
- prevBucketId = hdfsEvent.getBucketId();
- previousRegion = region;
-
- }
- if (bucketOrganizer != null) {
- bucketOrganizer.flush(list.iterator(), list.size());
- success = true;
-
- if (logger.fineEnabled()) {
- logger.fine("Batch written to HDFS of size " + list.size() + " for region " + previousRegion);
- }
- }
- } catch (IOException e) {
- logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
- return false;
- }
- catch (ForceReattemptException e) {
- if (logger.fineEnabled())
- logger.fine(e);
- return false;
- }
- catch(PrimaryBucketException e) {
- //do nothing, the bucket is no longer primary so we shouldn't get the same
- //batch next time.
- if (logger.fineEnabled())
- logger.fine(e);
- return false;
- }
- catch(BucketMovedException e) {
- //do nothing, the bucket is no longer primary so we shouldn't get the same
- //batch next time.
- if (logger.fineEnabled())
- logger.fine(e);
- return false;
- }
- catch (CacheClosedException e) {
- if (logger.fineEnabled())
- logger.fine(e);
- // exit silently
- return false;
- } catch (InterruptedException e1) {
- if (logger.fineEnabled())
- logger.fine(e1);
- return false;
- } finally {
- failureTracker.record(success);
- }
-
- return true;
- }
-
- private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) {
- BucketRegion br = region.getDataStore().getLocalBucketById(bucketId);
- if (br == null) {
- // got rebalanced or something
- throw new PrimaryBucketException("Bucket region is no longer available " + bucketId + region);
- }
-
- return br.getHoplogOrganizer();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
deleted file mode 100644
index 0860e75..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
-import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-
-/**
- * Current use of this class is limited to ignoring the Bulk DML operations.
- *
- *
- */
-public class HDFSEventQueueFilter implements GatewayEventFilter{
- private LogWriterI18n logger;
-
- public HDFSEventQueueFilter(LogWriterI18n logger) {
- this.logger = logger;
- }
- @Override
- public void close() {
-
- }
-
- @Override
- public boolean beforeEnqueue(GatewayQueueEvent event) {
- Operation op = event.getOperation();
-
-
- /* MergeGemXDHDFSToGFE - Disabled as it is gemxd specific
- if (op == Operation.BULK_DML_OP) {
- // On accessors there are no parallel queues, so with the
- // current logic, isSerialWanEnabled function in LocalRegion
- // always returns true on an accessor. So when a bulk dml
- // op is fired on accessor, this behavior results in distribution
- // of the bulk dml operation to other members. To avoid putting
- // of this bulk dml in parallel queues, added this filter. This
- // is not the efficient way as the filters are used before inserting
- // in the queue. The bulk dmls should be blocked before they are distributed.
- if (logger.fineEnabled())
- logger.fine( "HDFSEventQueueFilter:beforeEnqueue: Disallowing insertion of a bulk DML in HDFS queue.");
- return false;
- }*/
-
- return true;
- }
-
- @Override
- public boolean beforeTransmit(GatewayQueueEvent event) {
- // No op
- return true;
- }
-
- @Override
- public void afterAcknowledgement(GatewayQueueEvent event) {
- // No op
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
deleted file mode 100644
index db99e7e..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.EntryEvent;
-import com.gemstone.gemfire.internal.InternalDataSerializer;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.lru.Sizeable;
-import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
-import com.gemstone.gemfire.internal.offheap.StoredObject;
-import com.gemstone.gemfire.internal.offheap.annotations.Retained;
-import com.gemstone.gemfire.internal.util.BlobHelper;
-
-
-/**
- * Gateway event extended for HDFS functionality
- *
- */
-public class HDFSGatewayEventImpl extends GatewaySenderEventImpl {
-
- private static final long serialVersionUID = 4642852957292192406L;
- protected transient boolean keyIsSerialized = false;
- protected byte[] serializedKey = null;
- protected VersionTag versionTag;
-
- public HDFSGatewayEventImpl(){
- }
-
- @Retained
- public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
- Object substituteValue)
- throws IOException {
- super(operation, event, substituteValue);
- initializeHDFSGatewayEventObject(event);
- }
-
- @Retained
- public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
- Object substituteValue, boolean initialize, int bucketId) throws IOException {
- super(operation, event,substituteValue, initialize, bucketId);
- initializeHDFSGatewayEventObject(event);
- }
-
- @Retained
- public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
- Object substituteValue, boolean initialize) throws IOException {
- super(operation, event, substituteValue, initialize);
- initializeHDFSGatewayEventObject(event);
- }
-
- protected HDFSGatewayEventImpl(HDFSGatewayEventImpl offHeapEvent) {
- super(offHeapEvent);
- this.keyIsSerialized = offHeapEvent.keyIsSerialized;
- this.serializedKey = offHeapEvent.serializedKey;
- this.versionTag = offHeapEvent.versionTag;
- }
-
- @Override
- protected GatewaySenderEventImpl makeCopy() {
- return new HDFSGatewayEventImpl(this);
- }
-
- private void initializeHDFSGatewayEventObject(EntryEvent event)
- throws IOException {
-
- serializeKey();
- versionTag = ((EntryEventImpl)event).getVersionTag();
- if (versionTag != null && versionTag.getMemberID() == null) {
- versionTag.setMemberID(((LocalRegion)getRegion()).getVersionMember());
- }
- }
-
- private void serializeKey() throws IOException {
- if (!keyIsSerialized && isInitialized())
- {
- this.serializedKey = CacheServerHelper.serialize(this.key);
- keyIsSerialized = true;
- }
- }
- /**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
- /*@Override
- protected StoredObject obtainOffHeapValueBasedOnOp(EntryEventImpl event,
- boolean hasNonWanDispatcher) {
- return event.getOffHeapNewValue();
- }*/
-
- /**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
- /*@Override
- protected Object obtainHeapValueBasedOnOp(EntryEventImpl event,
- boolean hasNonWanDispatcher) {
- return event.getRawNewValue(shouldApplyDelta());
- }*/
-
- @Override
- protected boolean shouldApplyDelta() {
- return true;
- }
-
-
- @Override
- public void toData(DataOutput out) throws IOException {
- super.toData(out);
- DataSerializer.writeObject(this.versionTag, out);
-
- }
-
- @Override
- protected void serializeKey(DataOutput out) throws IOException {
- DataSerializer.writeByteArray((byte[])this.serializedKey, out);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- super.fromData(in);
- this.versionTag = (VersionTag)DataSerializer.readObject(in);
- }
-
- @Override
- protected void deserializeKey(DataInput in) throws IOException,
- ClassNotFoundException {
- this.serializedKey = DataSerializer.readByteArray(in);
- this.key = BlobHelper.deserializeBlob(this.serializedKey,
- InternalDataSerializer.getVersionForDataStreamOrNull(in), null);
- keyIsSerialized = true;
- }
-
- @Override
- public int getDSFID() {
-
- return HDFS_GATEWAY_EVENT_IMPL;
- }
- public byte[] getSerializedKey() {
-
- return this.serializedKey;
- }
-
- public VersionTag getVersionTag() {
-
- return this.versionTag;
- }
-
- /**
- * Returns the size on HDFS of this event
- * @param writeOnly
- */
- public int getSizeOnHDFSInBytes(boolean writeOnly) {
-
- if (writeOnly)
- return UnsortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,
- getSerializedValueSize(), this.versionTag);
- else
- return SortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,
- getSerializedValueSize(), this.versionTag);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
deleted file mode 100644
index 740a607..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.Assert;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-/**
- * Contains utility functions
- *
- *
- */
-public class HDFSIntegrationUtil {
-
- public static <K, V> AsyncEventQueue createDefaultAsyncQueueForHDFS(Cache cache, boolean writeOnly, String regionPath) {
- return createAsyncQueueForHDFS(cache, regionPath, writeOnly, null);
- }
-
- private static AsyncEventQueue createAsyncQueueForHDFS(Cache cache, String regionPath, boolean writeOnly,
- HDFSStore configView) {
- LogWriterI18n logger = cache.getLoggerI18n();
- String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(regionPath);
-
- if (configView == null) {
- configView = new HDFSStoreFactoryImpl(cache).getConfigView();
- }
-
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(configView.getBatchSize());
- factory.setPersistent(configView.getBufferPersistent());
- factory.setDiskStoreName(configView.getDiskStoreName());
- factory.setMaximumQueueMemory(configView.getMaxMemory());
- factory.setBatchTimeInterval(configView.getBatchInterval());
- factory.setDiskSynchronous(configView.getSynchronousDiskWrite());
- factory.setDispatcherThreads(configView.getDispatcherThreads());
- factory.setParallel(true);
- factory.addGatewayEventFilter(new HDFSEventQueueFilter(logger));
- ((AsyncEventQueueFactoryImpl) factory).setBucketSorted(!writeOnly);
- ((AsyncEventQueueFactoryImpl) factory).setIsHDFSQueue(true);
-
- AsyncEventQueue asyncQ = null;
-
- if (!writeOnly)
- asyncQ = factory.create(defaultAsyncQueueName, new HDFSEventListener(cache.getLoggerI18n()));
- else
- asyncQ = factory.create(defaultAsyncQueueName, new HDFSWriteOnlyStoreEventListener(cache.getLoggerI18n()));
-
- logger.fine("HDFS: async queue created for HDFS. Id: " + asyncQ.getId() + ". Disk store: "
- + asyncQ.getDiskStoreName() + ". Batch size: " + asyncQ.getBatchSize() + ". bucket sorted: " + !writeOnly);
- return asyncQ;
-
- }
-
- public static void createAndAddAsyncQueue(String regionPath, RegionAttributes regionAttributes, Cache cache) {
- if (!regionAttributes.getDataPolicy().withHDFS()) {
- return;
- }
-
- String leaderRegionPath = getLeaderRegionPath(regionPath, regionAttributes, cache);
-
- String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(leaderRegionPath);
- if (cache.getAsyncEventQueue(defaultAsyncQueueName) == null) {
- if (regionAttributes.getHDFSStoreName() != null && regionAttributes.getPartitionAttributes() != null
- && !(regionAttributes.getPartitionAttributes().getLocalMaxMemory() == 0)) {
- HDFSStore store = ((GemFireCacheImpl) cache).findHDFSStore(regionAttributes.getHDFSStoreName());
- if (store == null) {
- throw new IllegalStateException(
- LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND.toLocalizedString(regionAttributes.getHDFSStoreName()));
- }
- HDFSIntegrationUtil
- .createAsyncQueueForHDFS(cache, leaderRegionPath, regionAttributes.getHDFSWriteOnly(), store);
- }
- }
- }
-
- private static String getLeaderRegionPath(String regionPath, RegionAttributes regionAttributes, Cache cache) {
- String colocated;
- while (regionAttributes.getPartitionAttributes() != null
- && (colocated = regionAttributes.getPartitionAttributes().getColocatedWith()) != null) {
- // Do not waitOnInitialization() for PR
- GemFireCacheImpl gfc = (GemFireCacheImpl) cache;
- Region colocatedRegion = gfc.getPartitionedRegion(colocated, false);
- if (colocatedRegion == null) {
- Assert.fail("Could not find parent region " + colocated + " for " + regionPath);
- }
- regionAttributes = colocatedRegion.getAttributes();
- regionPath = colocatedRegion.getFullPath();
- }
- return regionPath;
- }
-
-}