You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:10 UTC
[24/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java
new file mode 100644
index 0000000..f69b3dc
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Observes and reacts to flush events.
+ *
+ */
+public interface FlushObserver {
+ public interface AsyncFlushResult {
+ /**
+ * Waits for the most recently enqueued batch to completely flush.
+ *
+ * @param time the time to wait
+ * @param unit the time unit
+ * @return true if flushed before the timeout
+ * @throws InterruptedException interrupted while waiting
+ */
+ public boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException;
+ }
+
+ /**
+ * Returns true when the queued events should be drained from the queue
+ * immediately.
+ *
+ * @return true if draining
+ */
+ boolean shouldDrainImmediately();
+
+ /**
+ * Begins the flushing the queued events.
+ *
+ * @return the async result
+ */
+ public AsyncFlushResult flush();
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
new file mode 100644
index 0000000..9127e4d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
@@ -0,0 +1,1232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.SortedEventBuffer.BufferIterator;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.RegionEventImpl;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.ByteComparator;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.CursorIterator;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * This class holds the sorted list required for HDFS.
+ *
+ *
+ */
+public class HDFSBucketRegionQueue extends AbstractBucketRegionQueue {
+ private static final boolean VERBOSE = Boolean.getBoolean("hdfsBucketRegionQueue.VERBOSE");
+ private final int batchSize;
+ volatile HDFSEventQueue hdfsEventQueue = null;
+
+ // set before releasing the primary lock.
+ private final AtomicBoolean releasingPrimaryLock = new AtomicBoolean(true);
+
+ // This is used to keep track of the current size of the queue in bytes.
+ final AtomicLong queueSizeInBytes = new AtomicLong(0);
+ public boolean isBucketSorted = true;
+ /**
+ * @param regionName
+ * @param attrs
+ * @param parentRegion
+ * @param cache
+ * @param internalRegionArgs
+ */
+ public HDFSBucketRegionQueue(String regionName, RegionAttributes attrs,
+ LocalRegion parentRegion, GemFireCacheImpl cache,
+ InternalRegionArguments internalRegionArgs) {
+ super(regionName, attrs, parentRegion, cache, internalRegionArgs);
+
+ this.isBucketSorted = internalRegionArgs.getPartitionedRegion().getParallelGatewaySender().getBucketSorted();
+ if (isBucketSorted)
+ hdfsEventQueue = new MultiRegionSortedQueue();
+ else
+ hdfsEventQueue = new EventQueue();
+
+ batchSize = internalRegionArgs.getPartitionedRegion().
+ getParallelGatewaySender().getBatchSize() * 1024 *1024;
+ this.keySet();
+ }
+ @Override
+ protected void initialize(InputStream snapshotInputStream,
+ InternalDistributedMember imageTarget,
+ InternalRegionArguments internalRegionArgs) throws TimeoutException,
+ IOException, ClassNotFoundException {
+
+ super.initialize(snapshotInputStream, imageTarget, internalRegionArgs);
+
+ loadEventsFromTempQueue();
+
+ this.initialized = true;
+ notifyEventProcessor();
+ }
+
+ private TreeSet<Long> createSkipListFromMap(Set keySet) {
+ TreeSet<Long> sortedKeys = null;
+ if (!hdfsEventQueue.isEmpty())
+ return sortedKeys;
+
+ if (!keySet.isEmpty()) {
+ sortedKeys = new TreeSet<Long>(keySet);
+ if (!sortedKeys.isEmpty())
+ {
+ for (Long key : sortedKeys) {
+ if (this.isBucketSorted) {
+ Object hdfsevent = getNoLRU(key, true, false, false);
+ if (hdfsevent == null) { // this can happen when tombstones are recovered.
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Discarding key " + key + ", no event recovered"));
+ }
+ } else {
+ int eventSize = ((HDFSGatewayEventImpl)hdfsevent).
+ getSizeOnHDFSInBytes(!this.isBucketSorted);
+ hdfsEventQueue.put(key,(HDFSGatewayEventImpl)hdfsevent, eventSize );
+ queueSizeInBytes.getAndAdd(eventSize);
+ }
+ }
+ else {
+ Object hdfsevent = getNoLRU(key, true, false, false);
+ if (hdfsevent != null) { // hdfs event can be null when tombstones are recovered.
+ queueSizeInBytes.getAndAdd(((HDFSGatewayEventImpl)hdfsevent).
+ getSizeOnHDFSInBytes(!this.isBucketSorted));
+ }
+ ((EventQueue)hdfsEventQueue).put(key);
+ }
+
+ }
+ getEventSeqNum().setIfGreater(sortedKeys.last());
+ }
+
+ }
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
+ "For bucket " + getId() + ", total keys recovered are : " + keySet.size()
+ + " and the seqNo is " + getEventSeqNum()));
+ }
+ return sortedKeys;
+ }
+
+ @Override
+ protected void basicClear(RegionEventImpl ev) {
+ super.basicClear(ev);
+ queueSizeInBytes.set(0);
+ if ( this.getBucketAdvisor().isPrimary()) {
+ this.hdfsEventQueue.clear();
+ }
+ }
+
+ protected void clearQueues(){
+ queueSizeInBytes.set(0);
+ if ( this.getBucketAdvisor().isPrimary()) {
+ this.hdfsEventQueue.clear();
+ }
+ }
+
+ @Override
+ protected void basicDestroy(final EntryEventImpl event,
+ final boolean cacheWrite, Object expectedOldValue)
+ throws EntryNotFoundException, CacheWriterException, TimeoutException {
+ super.basicDestroy(event, cacheWrite, expectedOldValue);
+ }
+
+ ArrayList peekABatch() {
+ ArrayList result = new ArrayList();
+ hdfsEventQueue.peek(result);
+ return result;
+ }
+
+ @Override
+ protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, int sizeOfHDFSEvent) {
+ if (didPut && this.getBucketAdvisor().isPrimary()) {
+ HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue();
+ if (sizeOfHDFSEvent == -1) {
+ try {
+ // the size is calculated only on primary before event is inserted in the bucket.
+ // If this node became primary after size was calculated, sizeOfHDFSEvent will be -1.
+ // Try to get the size. #50016
+ sizeOfHDFSEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted);
+ } catch (Throwable e) {
+ // Ignore any exception while fetching the size.
+ sizeOfHDFSEvent = 0;
+ }
+ }
+ queueSizeInBytes.getAndAdd(sizeOfHDFSEvent);
+ if (this.initialized) {
+ Long longKey = (Long)key;
+ this.hdfsEventQueue.put(longKey, hdfsEvent, sizeOfHDFSEvent);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Put successfully in the queue : " + hdfsEvent + " . Queue initialized: "
+ + this.initialized);
+ }
+ }
+ }
+
+ /**
+ * It removes the first key from the queue.
+ *
+ * @return Returns the key for which value was destroyed.
+ * @throws ForceReattemptException
+ */
+ public Long remove() throws ForceReattemptException {
+ throw new UnsupportedOperationException("Individual entries cannot be removed in a HDFSBucketRegionQueue");
+ }
+
+ /**
+ * It removes the first key from the queue.
+ *
+ * @return Returns the value.
+ * @throws InterruptedException
+ * @throws ForceReattemptException
+ */
+ public Object take() throws InterruptedException, ForceReattemptException {
+ throw new UnsupportedOperationException("take() cannot be called for individual entries in a HDFSBucketRegionQueue");
+ }
+
+ public void destroyKeys(ArrayList<HDFSGatewayEventImpl> listToDestroy) {
+
+ HashSet<Long> removedSeqNums = new HashSet<Long>();
+
+ for (int index =0; index < listToDestroy.size(); index++) {
+ HDFSGatewayEventImpl entry = null;
+ if (this.isBucketSorted) {
+ // Remove the events in reverse order so that the events with higher sequence number
+ // are removed last to ensure consistency.
+ entry = listToDestroy.get(listToDestroy.size() - index -1);
+ } else {
+ entry = listToDestroy.get(index);
+ }
+
+ try {
+ if (this.logger.isDebugEnabled())
+ logger.debug("destroying primary key " + entry.getShadowKey() + " bucket id: " + this.getId());
+ // removed from peeked list
+ boolean deleted = this.hdfsEventQueue.remove(entry);
+ if (deleted) {
+ // this is an onheap event so a call to size should be ok.
+ long entrySize = entry.getSizeOnHDFSInBytes(!this.isBucketSorted);
+ destroyKey(entry.getShadowKey());
+ long queueSize = queueSizeInBytes.getAndAdd(-1*entrySize);
+ if (queueSize < 0) {
+ // In HA scenarios, queueSizeInBytes can go awry.
+ queueSizeInBytes.compareAndSet(queueSize, 0);
+ }
+ removedSeqNums.add(entry.getShadowKey());
+ }
+ }catch (ForceReattemptException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ParallelGatewaySenderQueue#remove->HDFSBucketRegionQueue#destroyKeys: " + "Got ForceReattemptException for " + this
+ + " for bucket = " + this.getId());
+ }
+ }
+ catch(EntryNotFoundException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ParallelGatewaySenderQueue#remove->HDFSBucketRegionQueue#destroyKeys: " + "Got EntryNotFoundException for " + this
+ + " for bucket = " + this.getId() + " and key " + entry.getShadowKey());
+ }
+ } finally {
+ entry.release();
+ }
+ }
+
+ if (this.getBucketAdvisor().isPrimary()) {
+ hdfsEventQueue.handleRemainingElements(removedSeqNums);
+ }
+ }
+
+
+ public boolean isReadyForPeek() {
+ return !this.isEmpty() && !this.hdfsEventQueue.isEmpty() && getBucketAdvisor().isPrimary();
+ }
+
+ public long getLastPeekTimeInMillis() {
+ return hdfsEventQueue.getLastPeekTimeInMillis();
+ }
+
+ public long getQueueSizeInBytes() {
+ return queueSizeInBytes.get();
+ }
+ /*
+ * This function is called when the bucket takes as the role of primary.
+ */
+ @Override
+ public void beforeAcquiringPrimaryState() {
+
+ queueSizeInBytes.set(0);
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
+ "This node has become primary for bucket " + this.getId() +". " +
+ "Creating sorted data structure for the async queue."));
+ }
+ releasingPrimaryLock.set(false);
+
+ // clear the hdfs queue in case it has already elements left if it was a primary
+ // in the past
+ hdfsEventQueue.clear();
+ if (isBucketSorted)
+ hdfsEventQueue = new MultiRegionSortedQueue();
+ else
+ hdfsEventQueue = new EventQueue();
+
+ TreeSet<Long> sortedKeys = createSkipListFromMap(this.keySet());
+
+ if (sortedKeys != null && sortedKeys.size() > 0) {
+ // Mark the events equal to batch size as duplicate.
+ // calculate the batch size based on the number of events currently in the queue
+ // This is an approximation.
+ long batchSizeMB = this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
+ long batchSizeInBytes = batchSizeMB*1024*1024;
+ long totalBucketSize = queueSizeInBytes.get();
+ totalBucketSize = totalBucketSize > 0 ? totalBucketSize: 1;
+ long totalEntriesInBucket = this.entryCount();
+ totalEntriesInBucket = totalEntriesInBucket > 0 ? totalEntriesInBucket: 1;
+
+ long perEntryApproxSize = totalBucketSize/totalEntriesInBucket;
+ perEntryApproxSize = perEntryApproxSize > 0 ? perEntryApproxSize: 1;
+
+ int batchSize = (int)(batchSizeInBytes/perEntryApproxSize);
+
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
+ "Calculating batch size " + " batchSizeMB: " + batchSizeMB + " batchSizeInBytes: " + batchSizeInBytes +
+ " totalBucketSize: " + totalBucketSize + " totalEntriesInBucket: " + totalEntriesInBucket +
+ " perEntryApproxSize: " + perEntryApproxSize + " batchSize: " + batchSize ));
+ }
+
+ markEventsAsDuplicate(batchSize, sortedKeys.iterator());
+ }
+ }
+
+ @Override
+ public void beforeReleasingPrimaryLockDuringDemotion() {
+ queueSizeInBytes.set(0);
+ releasingPrimaryLock.set(true);
+ // release memory in case of a clean transition
+ hdfsEventQueue.clear();
+ }
+
+ /**
+ * This function searches the skip list and the peeked skip list for a given region key
+ * @param region
+ *
+ */
+ public HDFSGatewayEventImpl getObjectForRegionKey(Region region, byte[] regionKey) {
+ // get can only be called for a sorted queue.
+ // Calling get with Long.MIN_VALUE seq number ensures that
+ // the list will return the key which has highest seq number.
+ return hdfsEventQueue.get(region, regionKey, Long.MIN_VALUE);
+ }
+
+ /**
+ * Get an iterator on the queue, passing in the partitioned region
+ * we want to iterate over the events from.
+ */
+ public SortedEventQueueIterator iterator(Region region) {
+ return hdfsEventQueue.iterator(region);
+ }
+
+ public long totalEntries() {
+ return entryCount();
+ }
+
+ /**
+ * Ideally this function should be called from a thread periodically to
+ * rollover the skip list when it is above a certain size.
+ *
+ */
+ public void rolloverSkipList() {
+ // rollover can only be called for a sorted queue.
+ hdfsEventQueue.rollover();
+ }
+
+ public boolean shouldDrainImmediately() {
+ return hdfsEventQueue.getFlushObserver().shouldDrainImmediately();
+ }
+
+ public AsyncFlushResult flush() {
+ if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flush requested"));
+ }
+ return hdfsEventQueue.getFlushObserver().flush();
+ }
+
+ /**
+ * This class keeps the regionkey and seqNum. The objects of this class are
+ * kept in a concurrent skip list. The order of elements is decided based on the
+ * comparison of regionKey + seqNum. This kind of comparison allows us to keep
+ * multiple updates on a single key (becaus it has different seq Num)
+ */
+ static class KeyToSeqNumObject implements Comparable<KeyToSeqNumObject>
+ {
+ private byte[] regionkey;
+ private Long seqNum;
+
+ KeyToSeqNumObject(byte[] regionkey, Long seqNum){
+ this.regionkey = regionkey;
+ this.seqNum = seqNum;
+ }
+
+ /**
+ * This function compares the key first. If the keys are same then seq num is compared.
+ * This function is a key function because it ensures that the skiplists hold the elements
+ * in an order we want it to and for multiple updates on key fetches the most recent one
+ * Currently we are comparing seq numbers but we will have to change it to version stamps.
+ * * List can have elements in following sequence
+ * K1 Value1 version : 1
+ * K2 Value2a version : 2
+ * K2 Value2 version : 1
+ * K3 Value3 version : 1
+ * For a get on K2, it should retunr K2 Value 2a.
+ */
+ @Override
+ public int compareTo(KeyToSeqNumObject o) {
+ int compareOutput = ByteComparator.compareBytes(
+ this.getRegionkey(), 0, this.getRegionkey().length, o.getRegionkey(), 0, o.getRegionkey().length);
+ if (compareOutput != 0 )
+ return compareOutput;
+
+ // If the keys are same and this is an object with dummy seq number,
+ // return -1. This will ensure that ceiling function on a skip list will enumerate
+ // all the entries and return the last one.
+ if (this.getSeqNum() == Long.MIN_VALUE)
+ return -1;
+
+ // this is to just maintain consistency with the above statement.
+ if (o.getSeqNum() == Long.MIN_VALUE)
+ return 1;
+
+ // minus operator pushes entries with lower seq number in the end so that
+ // the order as mentioned above is maintained. And the entries with
+ // higher version are fetched on a get.
+ return this.getSeqNum().compareTo(o.getSeqNum()) * -1;
+ }
+
+ @Override
+ public boolean equals (Object o) {
+ KeyToSeqNumObject obj = null;
+ if (o == null)
+ return false;
+
+ if (o instanceof KeyToSeqNumObject)
+ obj = (KeyToSeqNumObject)o;
+ else
+ return false;
+
+ if (this.compareTo(obj) != 0)
+ return false;
+ else
+ return true;
+ }
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return -1;
+ }
+
+ byte[] getRegionkey() {
+ return regionkey;
+ }
+
+ public Long getSeqNum() {
+ return seqNum;
+ }
+
+ public void setSeqNum(Long seqNum) {
+ this.seqNum = seqNum;
+ }
+
+ @Override
+ public String toString() {
+ return EntryEventImpl.deserialize(regionkey) + " {" + seqNum + "}";
+ }
+ }
+
+ public interface HDFSEventQueue {
+ FlushObserver getFlushObserver();
+
+ /** puts an event in the queue. */
+ public void put (long key, HDFSGatewayEventImpl event, int size);
+
+ public SortedEventQueueIterator iterator(Region region);
+
+ public void rollover();
+
+ /** Get a value from the queue
+ * @throws IllegalStateException if this queue doesn't support get
+ **/
+ public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
+ long minValue);
+
+ // Peeks a batch of size specified by batchSize
+ // And add the results to the array list
+ public void peek(ArrayList result);
+
+ // Checks if there are elements to bee peeked
+ public boolean isEmpty();
+
+ // removes the event if it has already been peeked.
+ public boolean remove(HDFSGatewayEventImpl event);
+
+ // take care of the elements that were peeked
+ // but were not removed after a batch dispatch
+ // due to concurrency effects.
+ public void handleRemainingElements(HashSet<Long> listToBeremoved);
+
+ // clears the list.
+ public void clear();
+
+ // get the time when the last peek was done.
+ public long getLastPeekTimeInMillis();
+ }
+
+ class MultiRegionSortedQueue implements HDFSEventQueue {
+ ConcurrentMap<String, SortedEventQueue> regionToEventQueue = new ConcurrentHashMap<String, SortedEventQueue>();
+ volatile Set<SortedEventQueue> peekedQueues = Collections.EMPTY_SET;
+ private final AtomicBoolean peeking = new AtomicBoolean(false);
+ long lastPeekTimeInMillis = System.currentTimeMillis();
+
+ private final FlushObserver flush = new FlushObserver() {
+ @Override
+ public AsyncFlushResult flush() {
+ final Set<AsyncFlushResult> flushes = new HashSet<AsyncFlushResult>();
+ for (SortedEventQueue queue : regionToEventQueue.values()) {
+ flushes.add(queue.getFlushObserver().flush());
+ }
+
+ return new AsyncFlushResult() {
+ @Override
+ public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException {
+ long start = System.nanoTime();
+ long remaining = unit.toNanos(timeout);
+ for (AsyncFlushResult afr : flushes) {
+ if (!afr.waitForFlush(remaining, TimeUnit.NANOSECONDS)) {
+ return false;
+ }
+ remaining -= (System.nanoTime() - start);
+ }
+ return true;
+ }
+ };
+ }
+
+ @Override
+ public boolean shouldDrainImmediately() {
+ for (SortedEventQueue queue : regionToEventQueue.values()) {
+ if (queue.getFlushObserver().shouldDrainImmediately()) {
+ return true;
+ }
+ }
+ return false;
+ }
+ };
+
+ @Override
+ public FlushObserver getFlushObserver() {
+ return flush;
+ }
+
+ @Override
+ public void put(long key, HDFSGatewayEventImpl event, int size) {
+
+ String region = event.getRegionPath();
+ SortedEventQueue regionQueue = regionToEventQueue.get(region);
+ if(regionQueue == null) {
+ regionToEventQueue.putIfAbsent(region, new SortedEventQueue());
+ regionQueue = regionToEventQueue.get(region);
+ }
+ regionQueue.put(key, event, size);
+ }
+
+ @Override
+ public void peek(ArrayList result) {
+ // The elements that were peeked last time, have not been persisted to HDFS
+ // yet. You cannot take out next batch until that is done.
+ if (!peeking.compareAndSet(false, true)) {
+ if (logger.isTraceEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peek already in progress, aborting"));
+ }
+ return;
+ }
+ //Maintain a separate set of peeked queues.
+ //All of these queues are statefull, and expect to be
+ //handleRemainingElements and clear to be called on
+ //them iff peek was called on them. However, new queues
+ //may be created in that time.
+ peekedQueues = Collections.newSetFromMap(new ConcurrentHashMap<SortedEventQueue, Boolean>(regionToEventQueue.size()));
+
+ //Peek from all of the existing queues
+ for(SortedEventQueue queue : regionToEventQueue.values()) {
+ if(!queue.isEmpty()) {
+ queue.peek(result);
+ peekedQueues.add(queue);
+ }
+ }
+ if (result.isEmpty())
+ peeking.set(false);
+
+
+ this.lastPeekTimeInMillis = System.currentTimeMillis();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ for(SortedEventQueue queue : regionToEventQueue.values()) {
+ if(!queue.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean remove(HDFSGatewayEventImpl event) {
+ String region = event.getRegionPath();
+ SortedEventQueue regionQueue = regionToEventQueue.get(region);
+ return regionQueue.remove(event);
+ }
+
+ @Override
+ public void handleRemainingElements(HashSet<Long> removedSeqNums){
+ for(SortedEventQueue queue : peekedQueues) {
+ queue.handleRemainingElements(removedSeqNums);
+ }
+ peekedQueues.clear();
+ peeking.set(false);
+ }
+
+ @Override
+ public void clear() {
+ for(SortedEventQueue queue : regionToEventQueue.values()) {
+ queue.clear();
+ }
+ peekedQueues.clear();
+ peeking.set(false);
+ }
+
+ @Override
+ public long getLastPeekTimeInMillis() {
+ return this.lastPeekTimeInMillis;
+ }
+
+ @Override
+ public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
+ long minValue) {
+ SortedEventQueue queue = regionToEventQueue.get(region.getFullPath());
+ if(queue == null) {
+ return null;
+ }
+ return queue.get(region, regionKey, minValue);
+ }
+
+ @Override
+ public SortedEventQueueIterator iterator(Region region) {
+ SortedEventQueue queue = regionToEventQueue.get(region.getFullPath());
+ if(queue == null) {
+ return new SortedEventQueueIterator(new LinkedBlockingDeque<SortedEventBuffer>());
+ }
+ return queue.iterator(region);
+ }
+
+ @Override
+ public void rollover() {
+ for(SortedEventQueue queue : regionToEventQueue.values()) {
+ queue.rollover();
+ }
+ }
+ }
+
+ class EventQueue implements HDFSEventQueue {
+ private final SignalledFlushObserver flush = new SignalledFlushObserver();
+ private final BlockingQueue<Long> eventSeqNumQueue = new LinkedBlockingQueue<Long>();
+ private final BlockingQueue<Long> peekedEvents = new LinkedBlockingQueue<Long>();
+ private long lastPeekTimeInMillis = System.currentTimeMillis();
+
+ public EventQueue() {
+
+ }
+
+ @Override
+ public FlushObserver getFlushObserver() {
+ return flush;
+ }
+
+ @Override
+ public void put(long key, HDFSGatewayEventImpl event, int size) {
+ put(key);
+ }
+ public void put (long key) {
+ eventSeqNumQueue.add(key);
+ flush.push();
+ incQueueSize();
+ }
+
+
+ @Override
+ public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
+ long minValue) {
+ throw new InternalGemFireError("Get not supported on unsorted queue");
+ }
+
+ @Override
+ public void peek(ArrayList peekedEntries) {
+ if (peekedEvents.size() != 0) {
+ return;
+ }
+
+ for(int size=0; size < batchSize; ) {
+ Long seqNum = eventSeqNumQueue.peek();
+ if (seqNum == null) {
+ // queue is now empty, return
+ break;
+ }
+ Object object = getNoLRU(seqNum, true, false, false);
+ if (object != null) {
+ peekedEvents.add(seqNum);
+ size += ((HDFSGatewayEventImpl)object).getSizeOnHDFSInBytes(!isBucketSorted);
+ peekedEntries.add(object);
+
+ } else {
+ logger.debug("The entry corresponding to the sequence number " +
+ seqNum + " is missing. This can happen when an entry is already" +
+ "dispatched before a bucket moved.");
+ // event is being ignored. Decrease the queue size
+ decQueueSize();
+ flush.pop(1);
+
+ }
+ eventSeqNumQueue.poll();
+
+ }
+ this.lastPeekTimeInMillis = System.currentTimeMillis();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return eventSeqNumQueue.isEmpty();
+ }
+
+
+ @Override
+ public boolean remove(HDFSGatewayEventImpl event) {
+ boolean deleted = peekedEvents.remove(event.getShadowKey());
+ if (deleted)
+ decQueueSize();
+ return deleted;
+ }
+
+ @Override
+ // It looks like that there is no need for this function
+ // in EventQueue.
+ public void handleRemainingElements(HashSet<Long> removedSeqNums) {
+ flush.pop(removedSeqNums.size());
+ eventSeqNumQueue.addAll(peekedEvents);
+ peekedEvents.clear();
+ }
+
+ @Override
+ public void clear() {
+ flush.clear();
+ decQueueSize(eventSeqNumQueue.size());
+ eventSeqNumQueue.clear();
+ decQueueSize(peekedEvents.size());
+ peekedEvents.clear();
+ }
+
+ @Override
+ public long getLastPeekTimeInMillis() {
+ return this.lastPeekTimeInMillis;
+ }
+ @Override
+ public SortedEventQueueIterator iterator(Region region) {
+ throw new InternalGemFireError("not supported on unsorted queue");
+ }
+ @Override
+ public void rollover() {
+ throw new InternalGemFireError("not supported on unsorted queue");
+ }
+ }
+
+ class SortedEventQueue implements HDFSEventQueue {
+ private final SignalledFlushObserver flush = new SignalledFlushObserver();
+
+ // List of all the skip lists that hold the data
+ final Deque<SortedEventBuffer> queueOfLists =
+ new LinkedBlockingDeque<SortedEventBuffer>();
+
+ // This points to the tail of the queue
+ volatile SortedEventBuffer currentSkipList = new SortedEventBuffer();
+
+ private final AtomicBoolean peeking = new AtomicBoolean(false);
+
+ private long lastPeekTimeInMillis = System.currentTimeMillis();
+
+ public SortedEventQueue() {
+ queueOfLists.add(currentSkipList);
+ }
+
+ @Override
+ public FlushObserver getFlushObserver() {
+ return flush;
+ }
+
+ public boolean remove(HDFSGatewayEventImpl event) {
+ SortedEventBuffer eventBuffer = queueOfLists.peek();
+ if (eventBuffer != null) {
+ return eventBuffer.copyToBuffer(event);
+ }
+ else {
+ // This can happen when the queue is cleared because of bucket movement
+ // before the remove is called.
+ return true;
+ }
+ }
+
+ public void clear() {
+ flush.clear();
+ for (SortedEventBuffer buf : queueOfLists) {
+ decQueueSize(buf.size());
+ buf.clear();
+ }
+
+ queueOfLists.clear();
+ rollList(false);
+
+ peeking.set(false);
+ }
+
+ public boolean isEmpty() {
+ if (queueOfLists.size() == 1)
+ return queueOfLists.peek().isEmpty();
+ return false;
+ }
+
+ public void put(long key, HDFSGatewayEventImpl event, int eventSize) {
+ if (logger.isTraceEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Inserting key " + event + " into list " + System.identityHashCode(currentSkipList)));
+ }
+ putInList(new KeyToSeqNumObject(((HDFSGatewayEventImpl)event).getSerializedKey(), key),
+ eventSize);
+ }
+
+ private void putInList(KeyToSeqNumObject entry, int sizeInBytes) {
+ // It was observed during testing that peek can start peeking
+ // elements from a list to which a put is happening. This happens
+ // when the peek changes the value of currentSkiplist to a new list
+ // but the put continues to write to an older list.
+ // So there is a possibility that an element is added to the list
+ // that has already been peeked. To handle this case, in handleRemainingElements
+ // function we re-add the elements that were not peeked.
+ if (currentSkipList.add(entry, sizeInBytes) == null) {
+ flush.push();
+ incQueueSize();
+ }
+ }
+
+ public void rollover(boolean forceRollover) {
+ if (currentSkipList.bufferSize() >= batchSize || forceRollover) {
+ rollList(forceRollover);
+ }
+ }
+
+ /**
+ * Ideally this function should be called from a thread periodically to
+ * rollover the skip list when it is above a certain size.
+ *
+ */
+ public void rollover() {
+ rollover(false);
+ }
+
+ public void peek(ArrayList peekedEntries) {
+ // The elements that were peeked last time, have not been persisted to HDFS
+ // yet. You cannot take out next batch until that is done.
+ if (!peeking.compareAndSet(false, true)) {
+ if (logger.isTraceEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peek already in progress, aborting"));
+ }
+ return;
+ }
+
+ if (queueOfLists.size() == 1) {
+ rollList(false);
+ }
+
+ Assert.assertTrue(queueOfLists.size() > 1, "Cannot peek from head of queue");
+ BufferIterator itr = queueOfLists.peek().iterator();
+ while (itr.hasNext()) {
+ KeyToSeqNumObject entry = itr.next();
+ if (logger.isTraceEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peeking key " + entry + " from list " + System.identityHashCode(queueOfLists.peek())));
+ }
+
+ HDFSGatewayEventImpl ev = itr.value();
+ ev.copyOffHeapValue();
+ peekedEntries.add(ev);
+ }
+
+ // discard an empty batch as it is not processed and will plug up the
+ // queue
+ if (peekedEntries.isEmpty()) {
+ SortedEventBuffer empty = queueOfLists.remove();
+ if (logger.isTraceEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Discarding empty batch " + empty));
+ }
+ peeking.set(false);
+ }
+ this.lastPeekTimeInMillis = System.currentTimeMillis();
+ }
+
+ public HDFSGatewayEventImpl get(Region region, byte[] regionKey, long key) {
+ KeyToSeqNumObject event = new KeyToSeqNumObject(regionKey, key);
+ Iterator<SortedEventBuffer> queueIterator = queueOfLists.descendingIterator();
+ while (queueIterator.hasNext()) {
+ HDFSGatewayEventImpl evt = queueIterator.next().getFromQueueOrBuffer(event);
+ if (evt != null) {
+ return evt;
+ }
+ }
+ return null;
+ }
+
+ public void handleRemainingElements(HashSet<Long> removedSeqNums) {
+ if (!peeking.get()) {
+ if (logger.isTraceEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Not peeked, just cleaning up empty batch; current list is " + currentSkipList));
+ }
+ return;
+ }
+
+ Assert.assertTrue(queueOfLists.size() > 1, "Cannot remove only event list");
+
+ // all done with the peeked elements, okay to throw away now
+ SortedEventBuffer buf = queueOfLists.remove();
+ SortedEventBuffer.BufferIterator bufIter = buf.iterator();
+ // Check if the removed buffer has any extra events. If yes, check if these extra
+ // events are part of region. If yes, reinsert these as they were probably inserted
+ // into this list while it was being peeked.
+ while (bufIter.hasNext()) {
+ KeyToSeqNumObject key = bufIter.next();
+ if (!removedSeqNums.contains(key.getSeqNum())) {
+ HDFSGatewayEventImpl evt = (HDFSGatewayEventImpl) getNoLRU(key.getSeqNum(), true, false, false);
+ if (evt != null) {
+ flush.push();
+ incQueueSize();
+ queueOfLists.getFirst().add(key, evt.getSizeOnHDFSInBytes(!isBucketSorted));
+ }
+ }
+ }
+
+ decQueueSize(buf.size());
+ flush.pop(buf.size());
+ peeking.set(false);
+ }
+
+ public long getLastPeekTimeInMillis(){
+ return this.lastPeekTimeInMillis;
+ }
+
+ NavigableSet<KeyToSeqNumObject> getPeeked() {
+ assert peeking.get();
+ return queueOfLists.peek().keySet();
+ }
+
+ private synchronized void rollList(boolean forceRollover) {
+ if (currentSkipList.bufferSize() < batchSize && queueOfLists.size() > 1 && !forceRollover)
+ return;
+ SortedEventBuffer tmp = new SortedEventBuffer();
+ queueOfLists.add(tmp);
+ if (logger.isTraceEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Rolling over list from " + currentSkipList + " to list " + tmp));
+ }
+ currentSkipList = tmp;
+ }
+
+ @Override
+ public SortedEventQueueIterator iterator(Region region) {
+ return new SortedEventQueueIterator(queueOfLists);
+ }
+ }
+
+ public class SortedEventBuffer {
+ private final HDFSGatewayEventImpl NULL = new HDFSGatewayEventImpl();
+
+ private final ConcurrentSkipListMap<KeyToSeqNumObject, HDFSGatewayEventImpl> events = new ConcurrentSkipListMap<KeyToSeqNumObject, HDFSGatewayEventImpl>();
+
+ private int bufferSize = 0;
+
+ public boolean copyToBuffer(HDFSGatewayEventImpl event) {
+ KeyToSeqNumObject key = new KeyToSeqNumObject(event.getSerializedKey(), event.getShadowKey());
+ if (events.containsKey(key)) {
+ // After an event has been delivered in a batch, we copy it into the
+ // buffer so that it can be returned by an already in progress iterator.
+ // If we do not do this it is possible to miss events since the hoplog
+ // iterator uses a fixed set of files that are determined when the
+ // iterator is created. The events will be GC'd once the buffer is no
+ // longer strongly referenced.
+ HDFSGatewayEventImpl oldVal = events.put(key, event);
+ assert oldVal == NULL;
+
+ return true;
+ }
+ // If the primary lock is being relinquished, the events is cleared and probaly that is
+ // why we are here. return true if the primary lock is being relinquished
+ if (releasingPrimaryLock.get())
+ return true;
+ else
+ return false;
+ }
+
+ public HDFSGatewayEventImpl getFromQueueOrBuffer(KeyToSeqNumObject key) {
+ KeyToSeqNumObject result = events.ceilingKey(key);
+ if (result != null && Bytes.compareTo(key.getRegionkey(), result.getRegionkey()) == 0) {
+
+ // first try to fetch the buffered event to make it fast.
+ HDFSGatewayEventImpl evt = events.get(result);
+ if (evt != NULL) {
+ return evt;
+ }
+ // now try to fetch the event from the queue region
+ evt = (HDFSGatewayEventImpl) getNoLRU(result.getSeqNum(), true, false, false);
+ if (evt != null) {
+ return evt;
+ }
+
+ // try to fetch again from the buffered events to avoid a race between
+ // item deletion and the above two statements.
+ evt = events.get(result);
+ if (evt != NULL) {
+ return evt;
+ }
+
+ }
+ return null;
+ }
+
+ public HDFSGatewayEventImpl add(KeyToSeqNumObject key, int sizeInBytes) {
+ bufferSize += sizeInBytes;
+ return events.put(key, NULL);
+ }
+
+ public void clear() {
+ events.clear();
+ }
+
+ public boolean isEmpty() {
+ return events.isEmpty();
+ }
+
+ public int bufferSize() {
+ return bufferSize;
+ }
+ public int size() {
+ return events.size();
+ }
+ public NavigableSet<KeyToSeqNumObject> keySet() {
+ return events.keySet();
+ }
+
+ public BufferIterator iterator() {
+ return new BufferIterator(events.keySet().iterator());
+ }
+
+ public class BufferIterator implements Iterator<KeyToSeqNumObject> {
+ private final Iterator<KeyToSeqNumObject> src;
+
+ private KeyToSeqNumObject currentKey;
+ private HDFSGatewayEventImpl currentVal;
+
+ private KeyToSeqNumObject nextKey;
+ private HDFSGatewayEventImpl nextVal;
+
+ public BufferIterator(Iterator<KeyToSeqNumObject> src) {
+ this.src = src;
+ moveNext();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextVal != null;
+ }
+
+ @Override
+ public KeyToSeqNumObject next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ currentKey = nextKey;
+ currentVal = nextVal;
+
+ moveNext();
+
+ return currentKey;
+ }
+
+ public KeyToSeqNumObject key() {
+ assert currentKey != null;
+ return currentKey;
+ }
+
+ public HDFSGatewayEventImpl value() {
+ assert currentVal != null;
+ return currentVal;
+ }
+
+ private void moveNext() {
+ while (src.hasNext()) {
+ nextKey = src.next();
+ nextVal = getFromQueueOrBuffer(nextKey);
+ if (nextVal != null) {
+ return;
+ } else if (logger.isDebugEnabled() || VERBOSE) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "The entry corresponding to"
+ + " the sequence number " + nextKey.getSeqNum()
+ + " is missing. This can happen when an entry is already"
+ + " dispatched before a bucket moved."));
+ }
+ }
+ nextKey = null;
+ nextVal = null;
+ }
+ }
+ }
+
+ public final class SortedEventQueueIterator implements CursorIterator<HDFSGatewayEventImpl> {
+ /** the iterators to merge */
+ private final List<SortedEventBuffer.BufferIterator> iters;
+
+ /** the current iteration value */
+ private HDFSGatewayEventImpl value;
+
+ public SortedEventQueueIterator(Deque<SortedEventBuffer> queueOfLists) {
+ iters = new ArrayList<SortedEventBuffer.BufferIterator>();
+ for (Iterator<SortedEventBuffer> iter = queueOfLists.descendingIterator(); iter.hasNext();) {
+ SortedEventBuffer.BufferIterator buf = iter.next().iterator();
+ if (buf.hasNext()) {
+ buf.next();
+ iters.add(buf);
+ }
+ }
+ }
+
+ public void close() {
+ value = null;
+ iters.clear();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !iters.isEmpty();
+ }
+
+ @Override
+ public HDFSGatewayEventImpl next() {
+ if (!hasNext()) {
+ throw new UnsupportedOperationException();
+ }
+
+ int diff = 0;
+ KeyToSeqNumObject min = null;
+ SortedEventBuffer.BufferIterator cursor = null;
+
+ for (Iterator<SortedEventBuffer.BufferIterator> merge = iters.iterator(); merge.hasNext(); ) {
+ SortedEventBuffer.BufferIterator buf = merge.next();
+ KeyToSeqNumObject tmp = buf.key();
+ if (min == null || (diff = Bytes.compareTo(tmp.regionkey, min.regionkey)) < 0) {
+ min = tmp;
+ cursor = buf;
+
+ } else if (diff == 0 && !advance(buf, min)) {
+ merge.remove();
+ }
+ }
+
+ value = cursor.value();
+ assert value != null;
+
+ if (!advance(cursor, min)) {
+ iters.remove(cursor);
+ }
+ return current();
+ }
+
+ @Override
+ public final HDFSGatewayEventImpl current() {
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private boolean advance(SortedEventBuffer.BufferIterator iter, KeyToSeqNumObject key) {
+ while (iter.hasNext()) {
+ if (Bytes.compareTo(iter.next().regionkey, key.regionkey) > 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
new file mode 100644
index 0000000..c8b7b28
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.SortedEventQueueIterator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.HDFSRegionMap;
+import com.gemstone.gemfire.internal.cache.KeyWithRegionContext;
+import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import org.apache.hadoop.hbase.util.Bytes;
+
+@SuppressWarnings("rawtypes")
+public class HDFSEntriesSet extends AbstractSet {
+ private final IteratorType type;
+
+ private final HoplogOrganizer hoplogs;
+ private final HDFSBucketRegionQueue brq;
+
+ private final BucketRegion region;
+ private final ReferenceQueue<HDFSIterator> refs;
+
+ public HDFSEntriesSet(BucketRegion region, HDFSBucketRegionQueue brq,
+ HoplogOrganizer hoplogs, IteratorType type, ReferenceQueue<HDFSIterator> refs) {
+ this.region = region;
+ this.brq = brq;
+ this.hoplogs = hoplogs;
+ this.type = type;
+ this.refs = refs;
+ }
+
+ @Override
+ public HDFSIterator iterator() {
+ HDFSIterator iter = new HDFSIterator(type, region.getPartitionedRegion(), true);
+ if (refs != null) {
+ // we can't rely on an explicit close but we need to free resources
+ //
+ // This approach has the potential to cause excessive memory load and/or
+ // GC problems if an app holds an iterator ref too long. A lease-based
+ // approach where iterators are automatically for X secs of inactivity is
+ // a potential alternative (but may require tuning for certain
+ // applications)
+ new WeakReference<HDFSEntriesSet.HDFSIterator>(iter, refs);
+ }
+ return iter;
+ }
+
+ @Override
+ public int size() {
+ // TODO this is the tortoise version, need a fast version for estimation
+ // note: more than 2^31-1 records will cause this counter to wrap
+ int size = 0;
+ HDFSIterator iter = new HDFSIterator(null, region.getPartitionedRegion(), false);
+ try {
+ while (iter.hasNext()) {
+ if (includeEntry(iter.next())) {
+ size++;
+ }
+ }
+ } finally {
+ iter.close();
+ }
+ return size;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ HDFSIterator iter = new HDFSIterator(null, region.getPartitionedRegion(), false);
+ try {
+ while (iter.hasNext()) {
+ if (includeEntry(iter.next())) {
+ return false;
+ }
+ }
+ } finally {
+ iter.close();
+ }
+ return true;
+ }
+
+ private boolean includeEntry(Object val) {
+ if (val instanceof HDFSGatewayEventImpl) {
+ HDFSGatewayEventImpl evt = (HDFSGatewayEventImpl) val;
+ if (evt.getOperation().isDestroy()) {
+ return false;
+ }
+ } else if (val instanceof PersistedEventImpl) {
+ PersistedEventImpl evt = (PersistedEventImpl) val;
+ if (evt.getOperation().isDestroy()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public class HDFSIterator implements Iterator {
+ private final IteratorType type;
+ private final boolean deserialize;
+
+ private final SortedEventQueueIterator queue;
+ private final HoplogIterator<byte[], SortedHoplogPersistedEvent> hdfs;
+ private Iterator txCreatedEntryIterator;
+
+ private boolean queueNext;
+ private boolean hdfsNext;
+ private boolean forUpdate;
+ private boolean hasTxEntry;
+
+ private byte[] currentHdfsKey;
+
+ public HDFSIterator(IteratorType type, Region region, boolean deserialize) {
+ this.type = type;
+ this.deserialize = deserialize;
+
+ // Check whether the queue has become primary here.
+ // There could be some time between bucket becoming a primary
+ // and underlying queue becoming a primary, so isPrimaryWithWait()
+ // waits for some time for the queue to become a primary on this member
+ if (!brq.getBucketAdvisor().isPrimaryWithWait()) {
+ InternalDistributedMember primaryHolder = brq.getBucketAdvisor()
+ .basicGetPrimaryMember();
+ throw new PrimaryBucketException("Bucket " + brq.getName()
+ + " is not primary. Current primary holder is " + primaryHolder);
+ }
+ // We are deliberating NOT sync'ing while creating the iterators. If done
+ // in the correct order, we may get duplicates (due to an in-progress
+ // flush) but we won't miss any entries. The dupes will be eliminated
+ // during iteration.
+ queue = brq.iterator(region);
+ advanceQueue();
+
+ HoplogIterator<byte[], SortedHoplogPersistedEvent> tmp = null;
+ try {
+ tmp = hoplogs.scan();
+ } catch (IOException e) {
+ HDFSEntriesSet.this.region.checkForPrimary();
+ throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(e.getMessage()), e);
+ }
+
+ hdfs = tmp;
+ if (hdfs != null) {
+ advanceHdfs();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean nonTxHasNext = hdfsNext || queueNext;
+ if (!nonTxHasNext && this.txCreatedEntryIterator != null) {
+ this.hasTxEntry = this.txCreatedEntryIterator.hasNext();
+ return this.hasTxEntry;
+ }
+ return nonTxHasNext;
+ }
+
+ @Override
+ public Object next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (hasTxEntry) {
+ hasTxEntry = false;
+ return this.txCreatedEntryIterator.next();
+ }
+
+ Object val;
+ if (!queueNext) {
+ val = getFromHdfs();
+ advanceHdfs();
+
+ } else if (!hdfsNext) {
+ val = getFromQueue();
+ advanceQueue();
+
+ } else {
+ byte[] qKey = queue.current().getSerializedKey();
+ byte[] hKey = this.currentHdfsKey;
+
+ int diff = Bytes.compareTo(qKey, hKey);
+ if (diff < 0) {
+ val = getFromQueue();
+ advanceQueue();
+
+ } else if (diff == 0) {
+ val = getFromQueue();
+ advanceQueue();
+
+ // ignore the duplicate
+ advanceHdfs();
+
+ } else {
+ val = getFromHdfs();
+ advanceHdfs();
+ }
+ }
+ return val;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() {
+ if (queueNext) {
+ queue.close();
+ }
+
+ if (hdfsNext) {
+ hdfs.close();
+ }
+ }
+
+ private Object getFromQueue() {
+ HDFSGatewayEventImpl evt = queue.current();
+ if (type == null) {
+ return evt;
+ }
+
+ switch (type) {
+ case KEYS:
+ byte[] key = evt.getSerializedKey();
+ return deserialize ? EntryEventImpl.deserialize(key) : key;
+
+ case VALUES:
+ return evt.getValue();
+
+ default:
+ Object keyObj = EntryEventImpl.deserialize(evt.getSerializedKey());
+ if(keyObj instanceof KeyWithRegionContext) {
+ ((KeyWithRegionContext)keyObj).setRegionContext(region.getPartitionedRegion());
+ }
+ return ((HDFSRegionMap) region.getRegionMap()).getDelegate().getEntryFromEvent(keyObj, evt, true, forUpdate);
+ }
+ }
+
+ private Object getFromHdfs() {
+ if (type == null) {
+ return hdfs.getValue();
+ }
+
+ switch (type) {
+ case KEYS:
+ byte[] key = this.currentHdfsKey;
+ return deserialize ? EntryEventImpl.deserialize(key) : key;
+
+ case VALUES:
+ PersistedEventImpl evt = hdfs.getValue();
+ return evt.getValue();
+
+ default:
+ Object keyObj = EntryEventImpl.deserialize(this.currentHdfsKey);
+ if(keyObj instanceof KeyWithRegionContext) {
+ ((KeyWithRegionContext)keyObj).setRegionContext(region.getPartitionedRegion());
+ }
+ return ((HDFSRegionMap) region.getRegionMap()).getDelegate().getEntryFromEvent(keyObj, hdfs.getValue(), true, forUpdate);
+ }
+ }
+
+ private void advanceHdfs() {
+ if (hdfsNext = hdfs.hasNext()) {
+ try {
+ this.currentHdfsKey = hdfs.next();
+ } catch (IOException e) {
+ region.checkForPrimary();
+ throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(e.getMessage()), e);
+ }
+ } else {
+ this.currentHdfsKey = null;
+ hdfs.close();
+ }
+ }
+
+ private void advanceQueue() {
+ if (queueNext = queue.hasNext()) {
+ queue.next();
+ } else {
+ brq.checkForPrimary();
+ queue.close();
+ }
+ }
+
+ public void setForUpdate(){
+ this.forUpdate = true;
+ }
+
+ /**MergeGemXDHDFSToGFE not sure of this function is required */
+ /*public void setTXState(TXState txState) {
+ TXRegionState txr = txState.getTXRegionState(region);
+ if (txr != null) {
+ txr.lock();
+ try {
+ this.txCreatedEntryIterator = txr.getCreatedEntryKeys().iterator();
+ }
+ finally{
+ txr.unlock();
+ }
+ }
+ }*/
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
new file mode 100644
index 0000000..607650f
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.i18n.StringId;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+
+/**
+ * Listener that persists events to HDFS
+ *
+ */
+public class HDFSEventListener implements AsyncEventListener {
+ private final LogWriterI18n logger;
+ private volatile boolean senderStopped = false;
+
+ private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f);
+
+ public HDFSEventListener(LogWriterI18n logger) {
+ this.logger = logger;
+ }
+
+ @Override
+ public void close() {
+ senderStopped = true;
+ }
+
+ @Override
+ public boolean processEvents(List<AsyncEvent> events) {
+ if (Hoplog.NOP_WRITE) {
+ return true;
+ }
+
+ // The list of events that async queue receives are sorted at the
+ // bucket level. Events for multiple regions are concatenated together.
+ // Events for multiple buckets are sent which are concatenated
+ // one after the other for e.g.
+ //
+ // <Region1, Key1, bucket1>, <Region1, Key19, bucket1>,
+ // <Region1, Key4, bucket2>, <Region1, Key6, bucket2>
+ // <Region2, Key1, bucket1>, <Region2, Key4, bucket1>
+ // ..
+
+ Region previousRegion = null;
+ int prevBucketId = -1;
+ ArrayList<QueuedPersistentEvent> list = null;
+ boolean success = false;
+ try {
+ //Back off if we are experiencing failures
+ failureTracker.sleepIfRetry();
+
+ HoplogOrganizer bucketOrganizer = null;
+ for (AsyncEvent asyncEvent : events) {
+ if (senderStopped){
+ failureTracker.failure();
+ if (logger.fineEnabled()) {
+ logger.fine("HDFSEventListener.processEvents: Cache is closing down. Ignoring the batch of data.");
+ }
+ return false;
+ }
+ HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)asyncEvent;
+ Region region = hdfsEvent.getRegion();
+
+ if (prevBucketId != hdfsEvent.getBucketId() || region != previousRegion){
+ if (prevBucketId != -1) {
+ bucketOrganizer.flush(list.iterator(), list.size());
+ success=true;
+ if (logger.fineEnabled()) {
+ logger.fine("Batch written to HDFS of size " + list.size() + " for region " + previousRegion);
+ }
+ }
+ bucketOrganizer = getOrganizer((PartitionedRegion) region, hdfsEvent.getBucketId());
+ // Bucket organizer can be null only when the bucket has moved. throw an exception so that the
+ // batch is discarded.
+ if (bucketOrganizer == null)
+ throw new BucketMovedException("Bucket moved. BucketId: " + hdfsEvent.getBucketId() + " HDFSRegion: " + region.getName());
+ list = new ArrayList<QueuedPersistentEvent>();
+ }
+ try {
+ //TODO:HDFS check if there is any serialization overhead
+ list.add(new SortedHDFSQueuePersistedEvent(hdfsEvent));
+ } catch (ClassNotFoundException e) {
+ //TODO:HDFS add localized string
+ logger.warning(new StringId(0, "Error while converting HDFSGatewayEvent to PersistedEventImpl."), e);
+ return false;
+ }
+ prevBucketId = hdfsEvent.getBucketId();
+ previousRegion = region;
+
+ }
+ if (bucketOrganizer != null) {
+ bucketOrganizer.flush(list.iterator(), list.size());
+ success = true;
+
+ if (logger.fineEnabled()) {
+ logger.fine("Batch written to HDFS of size " + list.size() + " for region " + previousRegion);
+ }
+ }
+ } catch (IOException e) {
+ logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
+ return false;
+ }
+ catch (ForceReattemptException e) {
+ if (logger.fineEnabled())
+ logger.fine(e);
+ return false;
+ }
+ catch(PrimaryBucketException e) {
+ //do nothing, the bucket is no longer primary so we shouldn't get the same
+ //batch next time.
+ if (logger.fineEnabled())
+ logger.fine(e);
+ return false;
+ }
+ catch(BucketMovedException e) {
+ //do nothing, the bucket is no longer primary so we shouldn't get the same
+ //batch next time.
+ if (logger.fineEnabled())
+ logger.fine(e);
+ return false;
+ }
+ catch (CacheClosedException e) {
+ if (logger.fineEnabled())
+ logger.fine(e);
+ // exit silently
+ return false;
+ } catch (InterruptedException e1) {
+ if (logger.fineEnabled())
+ logger.fine(e1);
+ return false;
+ } finally {
+ failureTracker.record(success);
+ }
+
+ return true;
+ }
+
+ private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) {
+ BucketRegion br = region.getDataStore().getLocalBucketById(bucketId);
+ if (br == null) {
+ // got rebalanced or something
+ throw new PrimaryBucketException("Bucket region is no longer available " + bucketId + region);
+ }
+
+ return br.getHoplogOrganizer();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
new file mode 100644
index 0000000..0860e75
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+
+/**
+ * Current use of this class is limited to ignoring the Bulk DML operations.
+ *
+ *
+ */
+public class HDFSEventQueueFilter implements GatewayEventFilter{
+ private LogWriterI18n logger;
+
+ public HDFSEventQueueFilter(LogWriterI18n logger) {
+ this.logger = logger;
+ }
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public boolean beforeEnqueue(GatewayQueueEvent event) {
+ Operation op = event.getOperation();
+
+
+ /* MergeGemXDHDFSToGFE - Disabled as it is gemxd specific
+ if (op == Operation.BULK_DML_OP) {
+ // On accessors there are no parallel queues, so with the
+ // current logic, isSerialWanEnabled function in LocalRegion
+ // always returns true on an accessor. So when a bulk dml
+ // op is fired on accessor, this behavior results in distribution
+ // of the bulk dml operation to other members. To avoid putting
+ // of this bulk dml in parallel queues, added this filter. This
+ // is not the efficient way as the filters are used before inserting
+ // in the queue. The bulk dmls should be blocked before they are distributed.
+ if (logger.fineEnabled())
+ logger.fine( "HDFSEventQueueFilter:beforeEnqueue: Disallowing insertion of a bulk DML in HDFS queue.");
+ return false;
+ }*/
+
+ return true;
+ }
+
+ @Override
+ public boolean beforeTransmit(GatewayQueueEvent event) {
+ // No op
+ return true;
+ }
+
+ @Override
+ public void afterAcknowledgement(GatewayQueueEvent event) {
+ // No op
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
new file mode 100644
index 0000000..db99e7e
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.lru.Sizeable;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+
+
+/**
+ * Gateway event extended for HDFS functionality
+ *
+ */
+public class HDFSGatewayEventImpl extends GatewaySenderEventImpl {
+
+ private static final long serialVersionUID = 4642852957292192406L;
+ protected transient boolean keyIsSerialized = false;
+ protected byte[] serializedKey = null;
+ protected VersionTag versionTag;
+
+ public HDFSGatewayEventImpl(){
+ }
+
+ @Retained
+ public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
+ Object substituteValue)
+ throws IOException {
+ super(operation, event, substituteValue);
+ initializeHDFSGatewayEventObject(event);
+ }
+
+ @Retained
+ public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
+ Object substituteValue, boolean initialize, int bucketId) throws IOException {
+ super(operation, event,substituteValue, initialize, bucketId);
+ initializeHDFSGatewayEventObject(event);
+ }
+
+ @Retained
+ public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
+ Object substituteValue, boolean initialize) throws IOException {
+ super(operation, event, substituteValue, initialize);
+ initializeHDFSGatewayEventObject(event);
+ }
+
+ protected HDFSGatewayEventImpl(HDFSGatewayEventImpl offHeapEvent) {
+ super(offHeapEvent);
+ this.keyIsSerialized = offHeapEvent.keyIsSerialized;
+ this.serializedKey = offHeapEvent.serializedKey;
+ this.versionTag = offHeapEvent.versionTag;
+ }
+
+ @Override
+ protected GatewaySenderEventImpl makeCopy() {
+ return new HDFSGatewayEventImpl(this);
+ }
+
+ private void initializeHDFSGatewayEventObject(EntryEvent event)
+ throws IOException {
+
+ serializeKey();
+ versionTag = ((EntryEventImpl)event).getVersionTag();
+ if (versionTag != null && versionTag.getMemberID() == null) {
+ versionTag.setMemberID(((LocalRegion)getRegion()).getVersionMember());
+ }
+ }
+
+ private void serializeKey() throws IOException {
+ if (!keyIsSerialized && isInitialized())
+ {
+ this.serializedKey = CacheServerHelper.serialize(this.key);
+ keyIsSerialized = true;
+ }
+ }
+ /**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
+ /*@Override
+ protected StoredObject obtainOffHeapValueBasedOnOp(EntryEventImpl event,
+ boolean hasNonWanDispatcher) {
+ return event.getOffHeapNewValue();
+ }*/
+
+ /**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
+ /*@Override
+ protected Object obtainHeapValueBasedOnOp(EntryEventImpl event,
+ boolean hasNonWanDispatcher) {
+ return event.getRawNewValue(shouldApplyDelta());
+ }*/
+
+ @Override
+ protected boolean shouldApplyDelta() {
+ return true;
+ }
+
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ DataSerializer.writeObject(this.versionTag, out);
+
+ }
+
+ @Override
+ protected void serializeKey(DataOutput out) throws IOException {
+ DataSerializer.writeByteArray((byte[])this.serializedKey, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.versionTag = (VersionTag)DataSerializer.readObject(in);
+ }
+
+ @Override
+ protected void deserializeKey(DataInput in) throws IOException,
+ ClassNotFoundException {
+ this.serializedKey = DataSerializer.readByteArray(in);
+ this.key = BlobHelper.deserializeBlob(this.serializedKey,
+ InternalDataSerializer.getVersionForDataStreamOrNull(in), null);
+ keyIsSerialized = true;
+ }
+
+ @Override
+ public int getDSFID() {
+
+ return HDFS_GATEWAY_EVENT_IMPL;
+ }
+ public byte[] getSerializedKey() {
+
+ return this.serializedKey;
+ }
+
+ public VersionTag getVersionTag() {
+
+ return this.versionTag;
+ }
+
+ /**
+ * Returns the size on HDFS of this event
+ * @param writeOnly
+ */
+ public int getSizeOnHDFSInBytes(boolean writeOnly) {
+
+ if (writeOnly)
+ return UnsortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,
+ getSerializedValueSize(), this.versionTag);
+ else
+ return SortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,
+ getSerializedValueSize(), this.versionTag);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
new file mode 100644
index 0000000..740a607
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * Contains utility functions
+ *
+ *
+ */
+public class HDFSIntegrationUtil {
+
+ public static <K, V> AsyncEventQueue createDefaultAsyncQueueForHDFS(Cache cache, boolean writeOnly, String regionPath) {
+ return createAsyncQueueForHDFS(cache, regionPath, writeOnly, null);
+ }
+
+ private static AsyncEventQueue createAsyncQueueForHDFS(Cache cache, String regionPath, boolean writeOnly,
+ HDFSStore configView) {
+ LogWriterI18n logger = cache.getLoggerI18n();
+ String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(regionPath);
+
+ if (configView == null) {
+ configView = new HDFSStoreFactoryImpl(cache).getConfigView();
+ }
+
+
+ AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+ factory.setBatchSize(configView.getBatchSize());
+ factory.setPersistent(configView.getBufferPersistent());
+ factory.setDiskStoreName(configView.getDiskStoreName());
+ factory.setMaximumQueueMemory(configView.getMaxMemory());
+ factory.setBatchTimeInterval(configView.getBatchInterval());
+ factory.setDiskSynchronous(configView.getSynchronousDiskWrite());
+ factory.setDispatcherThreads(configView.getDispatcherThreads());
+ factory.setParallel(true);
+ factory.addGatewayEventFilter(new HDFSEventQueueFilter(logger));
+ ((AsyncEventQueueFactoryImpl) factory).setBucketSorted(!writeOnly);
+ ((AsyncEventQueueFactoryImpl) factory).setIsHDFSQueue(true);
+
+ AsyncEventQueue asyncQ = null;
+
+ if (!writeOnly)
+ asyncQ = factory.create(defaultAsyncQueueName, new HDFSEventListener(cache.getLoggerI18n()));
+ else
+ asyncQ = factory.create(defaultAsyncQueueName, new HDFSWriteOnlyStoreEventListener(cache.getLoggerI18n()));
+
+ logger.fine("HDFS: async queue created for HDFS. Id: " + asyncQ.getId() + ". Disk store: "
+ + asyncQ.getDiskStoreName() + ". Batch size: " + asyncQ.getBatchSize() + ". bucket sorted: " + !writeOnly);
+ return asyncQ;
+
+ }
+
+ public static void createAndAddAsyncQueue(String regionPath, RegionAttributes regionAttributes, Cache cache) {
+ if (!regionAttributes.getDataPolicy().withHDFS()) {
+ return;
+ }
+
+ String leaderRegionPath = getLeaderRegionPath(regionPath, regionAttributes, cache);
+
+ String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(leaderRegionPath);
+ if (cache.getAsyncEventQueue(defaultAsyncQueueName) == null) {
+ if (regionAttributes.getHDFSStoreName() != null && regionAttributes.getPartitionAttributes() != null
+ && !(regionAttributes.getPartitionAttributes().getLocalMaxMemory() == 0)) {
+ HDFSStore store = ((GemFireCacheImpl) cache).findHDFSStore(regionAttributes.getHDFSStoreName());
+ if (store == null) {
+ throw new IllegalStateException(
+ LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND.toLocalizedString(regionAttributes.getHDFSStoreName()));
+ }
+ HDFSIntegrationUtil
+ .createAsyncQueueForHDFS(cache, leaderRegionPath, regionAttributes.getHDFSWriteOnly(), store);
+ }
+ }
+ }
+
+ private static String getLeaderRegionPath(String regionPath, RegionAttributes regionAttributes, Cache cache) {
+ String colocated;
+ while (regionAttributes.getPartitionAttributes() != null
+ && (colocated = regionAttributes.getPartitionAttributes().getColocatedWith()) != null) {
+ // Do not waitOnInitialization() for PR
+ GemFireCacheImpl gfc = (GemFireCacheImpl) cache;
+ Region colocatedRegion = gfc.getPartitionedRegion(colocated, false);
+ if (colocatedRegion == null) {
+ Assert.fail("Could not find parent region " + colocated + " for " + regionPath);
+ }
+ regionAttributes = colocatedRegion.getAttributes();
+ regionPath = colocatedRegion.getFullPath();
+ }
+ return regionPath;
+ }
+
+}