You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:00 UTC
[14/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index c477466..db14e57 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -126,6 +126,16 @@ import com.gemstone.gemfire.cache.client.internal.ClientMetadataService;
import com.gemstone.gemfire.cache.client.internal.ClientRegionFactoryImpl;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
@@ -922,6 +932,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
}
FunctionService.registerFunction(new PRContainsValueFunction());
+ FunctionService.registerFunction(new HDFSLastCompactionTimeFunction());
+ FunctionService.registerFunction(new HDFSForceCompactionFunction());
+ FunctionService.registerFunction(new HDFSFlushQueueFunction());
this.expirationScheduler = new ExpirationScheduler(this.system);
// uncomment following line when debugging CacheExistsException
@@ -2172,6 +2185,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
closeDiskStores();
diskMonitor.close();
+ closeHDFSStores();
+
// Close the CqService Handle.
try {
if (isDebugEnabled) {
@@ -2257,6 +2272,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
} catch (CancelException e) {
// make sure the disk stores get closed
closeDiskStores();
+ closeHDFSStores();
// NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
// okay, we're taking too long to do this stuff, so let's
@@ -3103,6 +3119,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
future = (Future) this.reinitializingRegions.get(fullPath);
}
if (future == null) {
+ HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath, attrs, this);
+ attrs = setEvictionAttributesForLargeRegion(attrs);
if (internalRegionArgs.getInternalMetaRegion() != null) {
rgn = internalRegionArgs.getInternalMetaRegion();
} else if (isPartitionedRegion) {
@@ -3227,6 +3245,54 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
+ /**
+ * turn on eviction by default for HDFS regions
+ */
+ @SuppressWarnings("deprecation")
+ public <K, V> RegionAttributes<K, V> setEvictionAttributesForLargeRegion(
+ RegionAttributes<K, V> attrs) {
+ RegionAttributes<K, V> ra = attrs;
+ if (DISABLE_AUTO_EVICTION) {
+ return ra;
+ }
+ if (attrs.getDataPolicy().withHDFS()
+ || attrs.getHDFSStoreName() != null) {
+ // make the region overflow by default
+ EvictionAttributes evictionAttributes = attrs.getEvictionAttributes();
+ boolean hasNoEvictionAttrs = evictionAttributes == null
+ || evictionAttributes.getAlgorithm().isNone();
+ AttributesFactory<K, V> af = new AttributesFactory<K, V>(attrs);
+ String diskStoreName = attrs.getDiskStoreName();
+ // set the local persistent directory to be the same as that for
+ // HDFS store
+ if (attrs.getHDFSStoreName() != null) {
+ HDFSStoreImpl hdfsStore = findHDFSStore(attrs.getHDFSStoreName());
+ if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && hdfsStore == null) {
+ // HDFS store expected to be found at this point
+ throw new IllegalStateException(
+ LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND
+ .toLocalizedString(attrs.getHDFSStoreName()));
+ }
+ // if there is no disk store, use the one configured for hdfs queue
+ if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && diskStoreName == null) {
+ diskStoreName = hdfsStore.getDiskStoreName();
+ }
+ }
+ // set LRU heap eviction with overflow to disk for HDFS stores with
+ // local Oplog persistence
+ // set eviction attributes only if not set
+ if (hasNoEvictionAttrs) {
+ if (diskStoreName != null) {
+ af.setDiskStoreName(diskStoreName);
+ }
+ af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(
+ ObjectSizer.DEFAULT, EvictionAction.OVERFLOW_TO_DISK));
+ }
+ ra = af.create();
+ }
+ return ra;
+ }
+
public final Region getRegion(String path) {
return getRegion(path, false);
}
@@ -4944,6 +5010,48 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
c.setRegionAttributes(pra.toString(), af.create());
break;
}
+ case PARTITION_HDFS: {
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ af.setPartitionAttributes(paf.create());
+ af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ af.setHDFSWriteOnly(false);
+ c.setRegionAttributes(pra.toString(), af.create());
+ break;
+ }
+ case PARTITION_REDUNDANT_HDFS: {
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(1);
+ af.setPartitionAttributes(paf.create());
+ af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ af.setHDFSWriteOnly(false);
+ c.setRegionAttributes(pra.toString(), af.create());
+ break;
+ }
+ case PARTITION_WRITEONLY_HDFS_STORE: {
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ af.setPartitionAttributes(paf.create());
+ af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ af.setHDFSWriteOnly(true);
+ c.setRegionAttributes(pra.toString(), af.create());
+ break;
+ }
+ case PARTITION_REDUNDANT_WRITEONLY_HDFS_STORE: {
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(1);
+ af.setPartitionAttributes(paf.create());
+ af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+ af.setHDFSWriteOnly(true);
+ c.setRegionAttributes(pra.toString(), af.create());
+ break;
+ }
default:
throw new IllegalStateException("unhandled enum " + pra);
}
@@ -5337,6 +5445,45 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
+ @Override
+ public HDFSStoreFactory createHDFSStoreFactory() {
+ // TODO Auto-generated method stub
+ return new HDFSStoreFactoryImpl(this);
+ }
+
+ public HDFSStoreFactory createHDFSStoreFactory(HDFSStoreCreation creation) {
+ return new HDFSStoreFactoryImpl(this, creation);
+ }
+ public void addHDFSStore(HDFSStoreImpl hsi) {
+ HDFSStoreDirector.getInstance().addHDFSStore(hsi);
+ //TODO:HDFS Add a resource event for hdfs store creation as well
+ // like the following disk store event
+ //system.handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, dsi);
+ }
+
+ public void removeHDFSStore(HDFSStoreImpl hsi) {
+ //hsi.destroy();
+ HDFSStoreDirector.getInstance().removeHDFSStore(hsi.getName());
+ //TODO:HDFS Add a resource event for hdfs store as well
+ // like the following disk store event
+ //system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
+ }
+
+ public void closeHDFSStores() {
+ HDFSRegionDirector.reset();
+ HDFSStoreDirector.getInstance().closeHDFSStores();
+ }
+
+
+ public HDFSStoreImpl findHDFSStore(String name) {
+ return HDFSStoreDirector.getInstance().getHDFSStore(name);
+ }
+
+ public Collection<HDFSStoreImpl> getHDFSStores() {
+ return HDFSStoreDirector.getInstance().getAllHDFSStores();
+ }
+
+
public TemporaryResultSetFactory getResultSetFactory() {
return this.resultSetFactory;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
index c924be5..3896800 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
@@ -373,20 +373,13 @@ public final class HARegion extends DistributedRegion
/**
* @return the deserialized value
- * @see LocalRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
+ * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean, boolean)
*
*/
@Override
- protected Object findObjectInSystem(KeyInfo keyInfo,
- boolean isCreate,
- TXStateInterface txState,
- boolean generateCallbacks,
- Object localValue,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones)
+ protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
+ TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead,
+ boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
throws CacheLoaderException, TimeoutException {
Object value = null;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
new file mode 100644
index 0000000..f6c6aa7
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.Collection;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
+import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Implementation of RegionMap that reads data from HDFS and adds LRU behavior
+ *
+ */
+public class HDFSLRURegionMap extends AbstractLRURegionMap implements HDFSRegionMap {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private final HDFSRegionMapDelegate delegate;
+
+ /**
+ * A tool from the eviction controller for sizing entries and
+ * expressing limits.
+ */
+ private EnableLRU ccHelper;
+
+ /** The list of nodes in LRU order */
+ private NewLRUClockHand lruList;
+
+ private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
+
+ public HDFSLRURegionMap(LocalRegion owner, Attributes attrs,
+ InternalRegionArguments internalRegionArgs) {
+ super(internalRegionArgs);
+ assert owner instanceof BucketRegion;
+ initialize(owner, attrs, internalRegionArgs);
+ this.delegate = new HDFSRegionMapDelegate(owner, attrs, internalRegionArgs, this);
+ }
+
+ @Override
+ public RegionEntry getEntry(Object key) {
+ return delegate.getEntry(key, null);
+ }
+
+ @Override
+ protected RegionEntry getEntry(EntryEventImpl event) {
+ return delegate.getEntry(event);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Collection<RegionEntry> regionEntries() {
+ return delegate.regionEntries();
+ }
+
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ @Override
+ protected void _setCCHelper(EnableLRU ccHelper) {
+ this.ccHelper = ccHelper;
+ }
+
+ @Override
+ protected EnableLRU _getCCHelper() {
+ return this.ccHelper;
+ }
+
+ @Override
+ protected void _setLruList(NewLRUClockHand lruList) {
+ this.lruList = lruList;
+ }
+
+ @Override
+ protected NewLRUClockHand _getLruList() {
+ return this.lruList;
+ }
+
+ @Override
+ public HDFSRegionMapDelegate getDelegate() {
+ return this.delegate;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
new file mode 100644
index 0000000..2a7baef
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+/**
+ * Interface implemented by RegionMap implementations that
+ * read from HDFS.
+ *
+ *
+ */
+public interface HDFSRegionMap {
+
+ /**
+ * @return the {@link HDFSRegionMapDelegate} that does
+ * all the work
+ */
+ public HDFSRegionMapDelegate getDelegate();
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
new file mode 100644
index 0000000..a2ef653
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSEntriesSet;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSEntriesSet.HDFSIterator;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
+import com.gemstone.gemfire.internal.cache.RegionMap.Attributes;
+import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
+import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
+
+/**
+ * This class encapsulates all the functionality of HDFSRegionMap, so
+ * that it can be provided to HDFSLRURegionMap.
+ *
+ */
+public class HDFSRegionMapDelegate {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private final BucketRegion owner;
+
+ private ConcurrentParallelGatewaySenderQueue hdfsQueue;
+
+ private final RegionMap backingRM;
+
+ /** queue of dead iterators */
+ private final ReferenceQueue<HDFSIterator> refs;
+
+ private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
+
+ /**
+ * used for serializing fetches from HDFS
+ */
+ private ConcurrentMap<Object, FutureResult> futures = new ConcurrentHashMap<Object, FutureResult>();
+
+ public HDFSRegionMapDelegate(LocalRegion owner, Attributes attrs,
+ InternalRegionArguments internalRegionArgs, RegionMap backingRM) {
+ assert owner instanceof BucketRegion;
+ this.owner = (BucketRegion) owner;
+ this.backingRM = backingRM;
+ refs = new ReferenceQueue<HDFSEntriesSet.HDFSIterator>();
+ }
+
+ public RegionEntry getEntry(Object key, EntryEventImpl event) {
+
+ RegionEntry re = getEntry(key, event, true);
+ // get from tx should put the entry back in map
+ // it should be evicted once tx completes
+ /**MergeGemXDHDFSToGFE txstate does not apply for this*/
+ /* if (re != null && getTXState(event) != null) {
+ if (re != null) {
+ // put the region entry in backing CHM of AbstractRegionMap so that
+ // it can be locked in basicPut/destroy
+ RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
+ if (oldRe != null) {
+ if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+ ((OffHeapRegionEntry)re).release();
+ }
+ return oldRe;
+ }
+ re.setMarkedForEviction();
+ owner.updateSizeOnCreate(key,
+ owner.calculateRegionEntryValueSize(re));
+ ((AbstractRegionMap)backingRM).incEntryCount(1);
+ ((AbstractRegionMap)backingRM).lruEntryCreate(re);
+ }*/
+ return re;
+ }
+
+ /*
+ private TXStateInterface getTXState(EntryEventImpl event) {
+ return event != null ? event.getTXState(this.owner) : this.owner
+ .getTXState();
+ }*/
+
+ /**
+ *
+ * @param key
+ * @param event
+ * @param forceOnHeap if true will return heap version of off-heap region entries
+ */
+ private RegionEntry getEntry(Object key, EntryEventImpl event, boolean forceOnHeap) {
+ closeDeadIterators();
+
+ RegionEntry re = backingRM.getEntryInVM(key);
+ if (logger.isTraceEnabled() || DEBUG) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: Found the key in CHM: " + key
+ + " ,value=" + (re == null? "null" : "[" + re._getValue() + " or (" + re.getValueAsToken() + ")]")));
+ }
+ if ((re == null || (re.isRemoved() && !re.isTombstone()))
+ && owner.getBucketAdvisor().isPrimary() && allowReadFromHDFS()) {
+ if (logger.isTraceEnabled() || DEBUG) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: fetching from hdfs key:" + key));
+ }
+ try {
+ this.owner.getPartitionedRegion().hdfsCalled(key);
+ re = getEntryFromFuture(key);
+ if (re != null) {
+ return re;
+ }
+
+ assert this.owner.getPartitionedRegion().getDataPolicy()
+ .withHDFS();
+ byte[] k = EntryEventImpl.serialize(key);
+
+ // for destroy ops we will retain the entry in the region map so
+ // tombstones can be tracked
+ //final boolean forceOnHeap = (event==null || !event.getOperation().isDestroy());
+
+ // get from queue
+ re = getFromHDFSQueue(key, k, forceOnHeap);
+ if (re == null) {
+ // get from HDFS
+ re = getFromHDFS(key, k, forceOnHeap);
+ }
+ if (re != null && re.isTombstone()) {
+ RegionVersionVector vector = this.owner.getVersionVector();
+// if (vector == null) {
+// this.owner.getLogWriterI18n().info(LocalizedStrings.DEBUG,
+// "found a tombstone in a region w/o a version vector: " + re + "; region: " + this.owner);
+// }
+ if (vector == null
+ || vector.isTombstoneTooOld(re.getVersionStamp().getMemberID(),
+ re.getVersionStamp().getRegionVersion())) {
+ re = null;
+ }
+ }
+ if (logger.isTraceEnabled() || DEBUG) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: returning from hdfs re:" + re));
+ }
+ } catch (ForceReattemptException e) {
+ throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+ } catch (IOException e) {
+ throw new HDFSIOException("Error reading from HDFS", e);
+ } finally {
+ notifyFuture(key, re);
+ // If we mark it here, the table scan may miss it causing updates/delete using table
+ // scan to fail.
+// if (re != null) {
+// re.setMarkedForEviction();
+// }
+ if(re != null && event != null && !re.isTombstone()) {
+ if (logger.isTraceEnabled() || DEBUG) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: loaded from hdfs re:" + re));
+ }
+ BucketRegion br = (BucketRegion)owner;
+ //CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes();
+ //if(csAttr!=null)
+ event.setLoadedFromHDFS(true);
+ }
+ }
+ }
+ if(re!=null && re.isMarkedForEviction() && !re.isTombstone()) {
+ if(event!=null) {
+ event.setLoadedFromHDFS(true);
+ }
+ }
+
+ return re;
+ }
+
+ /**
+ * This method returns true if the RegionEntry should be read from HDFS.
+ * fixes #49101 by not allowing reads from HDFS for persistent regions
+ * that do not define an eviction criteria.
+ *
+ * @return true if RegionEntry should be read from HDFS
+ */
+ private boolean allowReadFromHDFS() {
+ if (!owner.getDataPolicy().withPersistence()
+ || owner.getCustomEvictionAttributes() != null
+ || isEvictionActionLocalDestroy()){
+ /**MergeGemXDHDFSToGFE this is used for global index. Hence not required here*/
+ //|| owner.isUsedForIndex()) {
+ // when region does not have persistence, we have to read from HDFS (even
+ // though there is no eviction criteria) for constraint checks
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isEvictionActionLocalDestroy() {
+ PartitionedRegion pr = owner.getPartitionedRegion();
+ if (pr.getEvictionAttributes() != null) {
+ return pr.getEvictionAttributes().getAction() == EvictionAction.LOCAL_DESTROY;
+ }
+ return false;
+ }
+
+ protected RegionEntry getEntry(EntryEventImpl event) {
+ RegionEntry re = getEntry(event.getKey(), event, false);
+ if (re != null && event.isLoadedFromHDFS()) {
+ // put the region entry in backing CHM of AbstractRegionMap so that
+ // it can be locked in basicPut/destroy
+ RegionEntry oldRe = backingRM.putEntryIfAbsent(event.getKey(), re);
+ if (oldRe != null) {
+ if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+ ((OffHeapRegionEntry) re).release();
+ }
+ return oldRe;
+ }
+ // since the entry is faulted in from HDFS, it must have
+ // satisfied the eviction criteria in the past, so mark it for eviction
+ re.setMarkedForEviction();
+
+ owner.updateSizeOnCreate(event.getKey(), owner.calculateRegionEntryValueSize(re));
+ ((AbstractRegionMap) backingRM).incEntryCount(1);
+ ((AbstractRegionMap) backingRM).lruEntryCreate(re);
+ }
+ return re;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Collection<RegionEntry> regionEntries() {
+ closeDeadIterators();
+ if (!owner.getPartitionedRegion().includeHDFSResults()) {
+ if (logger.isDebugEnabled() || DEBUG) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #regionEntries"));
+ }
+ return backingRM.regionEntriesInVM();
+ }
+
+ try {
+ return createEntriesSet(IteratorType.ENTRIES);
+ } catch (ForceReattemptException e) {
+ throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ public int size() {
+ closeDeadIterators();
+ if (!owner.getPartitionedRegion().includeHDFSResults()) {
+ if (logger.isDebugEnabled() || DEBUG) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #size"));
+ }
+ return backingRM.sizeInVM();
+ }
+
+ try {
+ return createEntriesSet(IteratorType.KEYS).size();
+ } catch (ForceReattemptException e) {
+ throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ public boolean isEmpty() {
+ closeDeadIterators();
+ if (!owner.getPartitionedRegion().includeHDFSResults()) {
+ if (logger.isDebugEnabled() || DEBUG) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #isEmpty"));
+ }
+ return backingRM.sizeInVM() == 0;
+ }
+
+ try {
+ return createEntriesSet(IteratorType.KEYS).isEmpty();
+ } catch (ForceReattemptException e) {
+ throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ private void notifyFuture(Object key, RegionEntry re) {
+ FutureResult future = this.futures.remove(key);
+ if (future != null) {
+ future.set(re);
+ }
+ }
+
+ private RegionEntry getEntryFromFuture(Object key) {
+ FutureResult future = new FutureResult(this.owner.getCancelCriterion());
+ FutureResult old = this.futures.putIfAbsent(key, future);
+ if (old != null) {
+ if (logger.isTraceEnabled() || DEBUG) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: waiting for concurrent fetch to complete for key:" + key));
+ }
+ try {
+ return (RegionEntry) old.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ this.owner.getCache().getCancelCriterion().checkCancelInProgress(null);
+ }
+ }
+ return null;
+ }
+
+ private RegionEntry getFromHDFS(Object key, byte[] k, boolean forceOnHeap) throws IOException, ForceReattemptException {
+ SortedHoplogPersistedEvent ev;
+ try {
+ ev = (SortedHoplogPersistedEvent) owner.getHoplogOrganizer().read(k);
+ } catch (IOException e) {
+ owner.checkForPrimary();
+ throw e;
+ }
+ if (ev != null) {
+ if (logger.isTraceEnabled() || DEBUG) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: got from hdfs ev:" + ev));
+ }
+ return getEntryFromEvent(key, ev, forceOnHeap, false);
+ }
+ return null;
+ }
+
+ /**
+ * set the versionTag on the newly faulted-in entry
+ */
+ private void setVersionTag(RegionEntry re, VersionTag versionTag) {
+ if (owner.concurrencyChecksEnabled) {
+ versionTag.setMemberID(
+ owner.getVersionVector().getCanonicalId(versionTag.getMemberID()));
+ VersionStamp versionedRe = (VersionStamp) re;
+ versionedRe.setVersions(versionTag);
+ }
+ }
+
+ private RegionEntry getFromHDFSQueue(Object key, byte[] k, boolean forceOnHeap) throws ForceReattemptException {
+ ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
+ if (q == null) return null;
+ HDFSGatewayEventImpl hdfsGatewayEvent = (HDFSGatewayEventImpl) q.get(owner.getPartitionedRegion(), k, owner.getId());
+ if (hdfsGatewayEvent != null) {
+ if (logger.isTraceEnabled() || DEBUG) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: got from hdfs queue: " + hdfsGatewayEvent));
+ }
+ return getEntryFromEvent(key, hdfsGatewayEvent, forceOnHeap, false);
+ }
+ return null;
+ }
+
+ private ConcurrentParallelGatewaySenderQueue getHDFSQueue()
+ throws ForceReattemptException {
+ if (this.hdfsQueue == null) {
+ String asyncQId = this.owner.getPartitionedRegion().getHDFSEventQueueName();
+ final AsyncEventQueueImpl asyncQ = (AsyncEventQueueImpl)this.owner.getCache().getAsyncEventQueue(asyncQId);
+ final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
+ AbstractGatewaySenderEventProcessor ep = gatewaySender.getEventProcessor();
+ if (ep == null) return null;
+ hdfsQueue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue();
+ }
+
+ // Check whether the queue has become primary here.
+ // There could be some time between bucket becoming a primary
+ // and underlying queue becoming a primary, so isPrimaryWithWait()
+ // waits for some time for the queue to become a primary on this member
+ final HDFSBucketRegionQueue brq = hdfsQueue.getBucketRegionQueue(
+ this.owner.getPartitionedRegion(), this.owner.getId());
+ if (brq != null) {
+ if (owner.getBucketAdvisor().isPrimary()
+ && !brq.getBucketAdvisor().isPrimaryWithWait()) {
+ InternalDistributedMember primaryHolder = brq.getBucketAdvisor()
+ .basicGetPrimaryMember();
+ throw new PrimaryBucketException("Bucket " + brq.getName()
+ + " is not primary. Current primary holder is " + primaryHolder);
+ }
+ }
+
+ return hdfsQueue;
+ }
+
+ public RegionEntry getEntryFromEvent(Object key, HDFSGatewayEventImpl event, boolean forceOnHeap, boolean forUpdate) {
+ Object val;
+ if (event.getOperation().isDestroy()) {
+ val = Token.TOMBSTONE;
+ } else if (event.getOperation().isInvalidate()) {
+ val = Token.INVALID;
+ } else {
+ val = event.getValue();
+ }
+ RegionEntry re = null;
+ final TXStateInterface tx = owner.getTXState();
+ if (tx == null) {
+ re = createRegionEntry(key, val, event.getVersionTag(), forceOnHeap);
+ return re;
+ }
+ else
+ if (val != null) {
+ if (((re = this.backingRM.getEntryInVM(key)) == null)
+ || (re.isRemoved() && !re.isTombstone())) {
+ boolean shouldCreateOnHeapEntry = !(owner.getOffHeap() && forUpdate);
+ re = createRegionEntry(key, val, event.getVersionTag(), shouldCreateOnHeapEntry);
+ if (forUpdate) {
+ if (re != null && tx != null) {
+ // put the region entry in backing CHM of AbstractRegionMap so that
+ // it can be locked in basicPut/destroy
+ RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
+ if (oldRe != null) {
+ if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+ ((OffHeapRegionEntry)re).release();
+ }
+ return oldRe;
+ }
+ re.setMarkedForEviction();
+ owner.updateSizeOnCreate(key,
+ owner.calculateRegionEntryValueSize(re));
+ ((AbstractRegionMap)backingRM).incEntryCount(1);
+ ((AbstractRegionMap)backingRM).lruEntryCreate(re);
+ }
+ }
+ }
+ }
+ return re;
+ }
+
+ public RegionEntry getEntryFromEvent(Object key, SortedHoplogPersistedEvent event, boolean forceOnHeap, boolean forUpdate) {
+ Object val = getValueFromEvent(event);
+ RegionEntry re = null;
+ final TXStateInterface tx = owner.getTXState();
+ if (tx == null) {
+ re = createRegionEntry(key, val, event.getVersionTag(), forceOnHeap);
+ return re;
+ }
+ else // FOR TX case, we need to create off heap entry if required
+ if (val != null) {
+ if (((re = this.backingRM.getEntryInVM(key)) == null)
+ || (re.isRemoved() && !re.isTombstone())) {
+ boolean shouldCreateOnHeapEntry = !(owner.getOffHeap() && forUpdate);
+ re = createRegionEntry(key, val, event.getVersionTag(), shouldCreateOnHeapEntry);
+ if(forUpdate) {
+ if (re != null && tx != null) {
+ // put the region entry in backing CHM of AbstractRegionMap so that
+ // it can be locked in basicPut/destroy
+ RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
+ if (oldRe != null) {
+ if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+ ((OffHeapRegionEntry)re).release();
+ }
+ return oldRe;
+ }
+ re.setMarkedForEviction();
+ owner.updateSizeOnCreate(key,
+ owner.calculateRegionEntryValueSize(re));
+ ((AbstractRegionMap)backingRM).incEntryCount(1);
+ ((AbstractRegionMap)backingRM).lruEntryCreate(re);
+ }
+ }
+ }
+ }
+ return re;
+ }
+
+ private RegionEntry createRegionEntry(Object key, Object value, VersionTag tag, boolean forceOnHeap) {
+ RegionEntryFactory ref = backingRM.getEntryFactory();
+ if (forceOnHeap) {
+ ref = ref.makeOnHeap();
+ }
+ value = getValueDuringGII(key, value);
+ RegionEntry re = ref.createEntry(this.owner, key, value);
+ setVersionTag(re, tag);
+ if (re instanceof LRUEntry) {
+ assert backingRM instanceof AbstractLRURegionMap;
+ EnableLRU ccHelper = ((AbstractLRURegionMap)backingRM)._getCCHelper();
+ ((LRUEntry)re).updateEntrySize(ccHelper);
+ }
+ return re;
+ }
+
+ private Object getValueDuringGII(Object key, Object value) {
+ if (owner.getIndexUpdater() != null && !owner.isInitialized()) {
+ return AbstractRegionMap.listOfDeltasCreator.newValue(key, owner, value,
+ null);
+ }
+ return value;
+ }
+
+ private Set createEntriesSet(IteratorType type)
+ throws ForceReattemptException {
+ ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
+ if (q == null) return Collections.emptySet();
+ HDFSBucketRegionQueue brq = q.getBucketRegionQueue(this.owner.getPartitionedRegion(), owner.getId());
+ return new HDFSEntriesSet(owner, brq, owner.getHoplogOrganizer(), type, refs);
+ }
+
+ private void closeDeadIterators() {
+ Reference<? extends HDFSIterator> weak;
+ while ((weak = refs.poll()) != null) {
+ if (logger.isTraceEnabled() || DEBUG) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Closing weak ref for iterator "
+ + weak.get()));
+ }
+ weak.get().close();
+ }
+ }
+
+ /**
+ * gets the value from event, deserializing if necessary.
+ */
+ private Object getValueFromEvent(PersistedEventImpl ev) {
+ if (ev.getOperation().isDestroy()) {
+ return Token.TOMBSTONE;
+ } else if (ev.getOperation().isInvalidate()) {
+ return Token.INVALID;
+ }
+ return ev.getValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
new file mode 100644
index 0000000..9336ed7
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.Collection;
+
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.internal.size.SingleObjectSizer;
+
+/**
+ * Implementation of RegionMap that reads data from HDFS.
+ *
+ */
+public class HDFSRegionMapImpl extends AbstractRegionMap implements HDFSRegionMap {
+
+ private final HDFSRegionMapDelegate delegate;
+
+ private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
+
+ public HDFSRegionMapImpl(LocalRegion owner, Attributes attrs,
+ InternalRegionArguments internalRegionArgs) {
+ super(internalRegionArgs);
+ assert owner instanceof BucketRegion;
+ initialize(owner, attrs, internalRegionArgs, false);
+ this.delegate = new HDFSRegionMapDelegate(owner, attrs, internalRegionArgs, this);
+ }
+
+ @Override
+ public RegionEntry getEntry(Object key) {
+ return delegate.getEntry(key, null);
+ }
+
+ @Override
+ protected RegionEntry getEntry(EntryEventImpl event) {
+ return delegate.getEntry(event);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Collection<RegionEntry> regionEntries() {
+ return delegate.regionEntries();
+ }
+
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ @Override
+ public HDFSRegionMapDelegate getDelegate() {
+ return this.delegate;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
index bda5a27..36eee80 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
@@ -20,6 +20,8 @@ package com.gemstone.gemfire.internal.cache;
import java.util.Collection;
import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
import com.gemstone.gemfire.cache.query.internal.cq.CqService;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.internal.cache.extension.Extensible;
@@ -43,5 +45,7 @@ public interface InternalCache extends Cache, Extensible<Cache> {
public CqService getCqService();
+ public Collection<HDFSStoreImpl> getHDFSStores() ;
+
public <T extends CacheService> T getService(Class<T> clazz);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
index 0885477..e506f2e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
@@ -39,22 +39,17 @@ public interface InternalDataView {
* @param keyInfo
* @param localRegion
* @param updateStats
- * @param disableCopyOnRead
- * @param preferCD
+ * @param disableCopyOnRead
+ * @param preferCD
* @param clientEvent TODO
* @param returnTombstones TODO
* @param retainResult if true then the result may be a retained off-heap reference
* @return the object associated with the key
*/
@Retained
- Object getDeserializedValue(KeyInfo keyInfo,
- LocalRegion localRegion,
- boolean updateStats,
- boolean disableCopyOnRead,
- boolean preferCD,
- EntryEventImpl clientEvent,
- boolean returnTombstones,
- boolean retainResult);
+ Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
+ boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
+ boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult);
/**
* @param event
@@ -187,8 +182,8 @@ public interface InternalDataView {
* @return the Object associated with the key
*/
Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, boolean generateCallbacks,
- Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones);
+ Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS);
/**
@@ -229,18 +224,13 @@ public interface InternalDataView {
*
* @param localRegion
* @param key
- * @param doNotLockEntry
+ * @param doNotLockEntry
* @param requestingClient the client that made the request, or null if not from a client
* @param clientEvent the client event, if any
* @param returnTombstones TODO
* @return the serialized value from the cache
*/
- Object getSerializedValue(LocalRegion localRegion,
- KeyInfo key,
- boolean doNotLockEntry,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) throws DataLocationException;
+ Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException;
abstract void checkSupportsRegionDestroy() throws UnsupportedOperationInTransactionException;
abstract void checkSupportsRegionInvalidate() throws UnsupportedOperationInTransactionException;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
index f7d46fe..41e763d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
@@ -37,6 +37,7 @@ public final class InternalRegionArguments
private boolean isUsedForPartitionedRegionAdmin;
private boolean isUsedForSerialGatewaySenderQueue;
private boolean isUsedForParallelGatewaySenderQueue;
+ private boolean isUsedForHDFSParallelGatewaySenderQueue = false;
private int bucketRedundancy;
private boolean isUsedForPartitionedRegionBucket;
private RegionAdvisor partitionedRegionAdvisor;
@@ -272,11 +273,26 @@ public final class InternalRegionArguments
this.isUsedForParallelGatewaySenderQueue = queueFlag;
return this;
}
+ public InternalRegionArguments setIsUsedForHDFSParallelGatewaySenderQueue(
+ boolean queueFlag) {
+ this.isUsedForHDFSParallelGatewaySenderQueue = queueFlag;
+ return this;
+ }
public boolean isUsedForParallelGatewaySenderQueue() {
return this.isUsedForParallelGatewaySenderQueue;
}
+ public boolean isUsedForHDFSParallelGatewaySenderQueue() {
+ return this.isUsedForHDFSParallelGatewaySenderQueue;
+ }
+
+ public boolean isReadWriteHDFSRegion() {
+ return isUsedForPartitionedRegionBucket()
+ && getPartitionedRegion().getHDFSStoreName() != null
+ && !getPartitionedRegion().getHDFSWriteOnly();
+ }
+
public InternalRegionArguments setParallelGatewaySender(
AbstractGatewaySender pgSender) {
this.parallelGatewaySender = pgSender;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 3ad294c..b3de9b7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -116,6 +116,11 @@ import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
import com.gemstone.gemfire.cache.control.ResourceManager;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
+import com.gemstone.gemfire.cache.hdfs.internal.HoplogListenerForRegion;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.cache.query.FunctionDomainException;
import com.gemstone.gemfire.cache.query.Index;
@@ -460,6 +465,10 @@ public class LocalRegion extends AbstractRegion
// Lock for updating PR MetaData on client side
public final Lock clientMetaDataLock = new ReentrantLock();
+
+ protected HdfsRegionManager hdfsManager;
+ protected HoplogListenerForRegion hoplogListener;
+
/**
* There seem to be cases where a region can be created and yet the
* distributed system is not yet in place...
@@ -632,6 +641,7 @@ public class LocalRegion extends AbstractRegion
}
}
+ this.hdfsManager = initHDFSManager();
this.dsi = findDiskStore(attrs, internalRegionArgs);
this.diskRegion = createDiskRegion(internalRegionArgs);
this.entries = createRegionMap(internalRegionArgs);
@@ -686,8 +696,22 @@ public class LocalRegion extends AbstractRegion
}
+ private HdfsRegionManager initHDFSManager() {
+ HdfsRegionManager hdfsMgr = null;
+ if (this.getHDFSStoreName() != null) {
+ this.hoplogListener = new HoplogListenerForRegion();
+ HDFSRegionDirector.getInstance().setCache(cache);
+ hdfsMgr = HDFSRegionDirector.getInstance().manageRegion(this,
+ this.getHDFSStoreName(), hoplogListener);
+ }
+ return hdfsMgr;
+ }
+
private RegionMap createRegionMap(InternalRegionArguments internalRegionArgs) {
RegionMap result = null;
+ if ((internalRegionArgs.isReadWriteHDFSRegion()) && this.diskRegion != null) {
+ this.diskRegion.setEntriesMapIncompatible(true);
+ }
if (this.diskRegion != null) {
result = this.diskRegion.useExistingRegionMap(this);
}
@@ -953,6 +977,11 @@ public class LocalRegion extends AbstractRegion
existing = (LocalRegion)this.subregions.get(subregionName);
if (existing == null) {
+ // create the async queue for HDFS if required.
+ HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath,
+ regionAttributes, this.cache);
+ regionAttributes = cache.setEvictionAttributesForLargeRegion(
+ regionAttributes);
if (regionAttributes.getScope().isDistributed()
&& internalRegionArgs.isUsedForPartitionedRegionBucket()) {
final PartitionedRegion pr = internalRegionArgs
@@ -962,8 +991,15 @@ public class LocalRegion extends AbstractRegion
internalRegionArgs.setKeyRequiresRegionContext(pr
.keyRequiresRegionContext());
if (pr.isShadowPR()) {
- newRegion = new BucketRegionQueue(subregionName, regionAttributes,
- this, this.cache, internalRegionArgs);
+ if (!pr.isShadowPRForHDFS()) {
+ newRegion = new BucketRegionQueue(subregionName, regionAttributes,
+ this, this.cache, internalRegionArgs);
+ }
+ else {
+ newRegion = new HDFSBucketRegionQueue(subregionName, regionAttributes,
+ this, this.cache, internalRegionArgs);
+ }
+
} else {
newRegion = new BucketRegion(subregionName, regionAttributes,
this, this.cache, internalRegionArgs);
@@ -1098,6 +1134,7 @@ public class LocalRegion extends AbstractRegion
if (event.getEventId() == null && generateEventID()) {
event.setNewEventId(cache.getDistributedSystem());
}
+ assert event.isFetchFromHDFS() : "validatedPut() should have been called";
// Fix for 42448 - Only make create with null a local invalidate for
// normal regions. Otherwise, it will become a distributed invalidate.
if (getDataPolicy() == DataPolicy.NORMAL) {
@@ -1224,20 +1261,18 @@ public class LocalRegion extends AbstractRegion
* @param retainResult if true then the result may be a retained off-heap reference
* @return the value for the given key
*/
- public final Object getDeserializedValue(RegionEntry re,
- final KeyInfo keyInfo,
- final boolean updateStats,
- boolean disableCopyOnRead,
- boolean preferCD,
- EntryEventImpl clientEvent,
- boolean returnTombstones,
- boolean retainResult) {
+ public final Object getDeserializedValue(RegionEntry re, final KeyInfo keyInfo, final boolean updateStats, boolean disableCopyOnRead,
+ boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
if (this.diskRegion != null) {
this.diskRegion.setClearCountReference();
}
try {
if (re == null) {
- re = this.entries.getEntry(keyInfo.getKey());
+ if (allowReadFromHDFS) {
+ re = this.entries.getEntry(keyInfo.getKey());
+ } else {
+ re = this.entries.getOperationalEntryInVM(keyInfo.getKey());
+ }
}
//skip updating the stats if the value is null
// TODO - We need to clean up the callers of the this class so that we can
@@ -1347,7 +1382,7 @@ public class LocalRegion extends AbstractRegion
public Object get(Object key, Object aCallbackArgument,
boolean generateCallbacks, EntryEventImpl clientEvent) throws TimeoutException, CacheLoaderException
{
- Object result = get(key, aCallbackArgument, generateCallbacks, false, false, null, clientEvent, false);
+ Object result = get(key, aCallbackArgument, generateCallbacks, false, false, null, clientEvent, false, true/*allowReadFromHDFS*/);
if (Token.isInvalid(result)) {
result = null;
}
@@ -1357,16 +1392,11 @@ public class LocalRegion extends AbstractRegion
/*
* @see BucketRegion#getSerialized(KeyInfo, boolean, boolean)
*/
- public Object get(Object key,
- Object aCallbackArgument,
- boolean generateCallbacks,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) throws TimeoutException, CacheLoaderException {
+ public Object get(Object key, Object aCallbackArgument,
+ boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws TimeoutException, CacheLoaderException {
return get(key, aCallbackArgument,
- generateCallbacks, disableCopyOnRead, preferCD,requestingClient, clientEvent, returnTombstones, false, false);
+ generateCallbacks, disableCopyOnRead, preferCD,requestingClient, clientEvent, returnTombstones, false, allowReadFromHDFS, false);
}
/**
@@ -1388,17 +1418,16 @@ public class LocalRegion extends AbstractRegion
public Object getRetained(Object key, Object aCallbackArgument,
boolean generateCallbacks, boolean disableCopyOnRead,
ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean opScopeIsLocal) throws TimeoutException, CacheLoaderException {
- return get(key, aCallbackArgument, generateCallbacks, disableCopyOnRead, true, requestingClient, clientEvent, returnTombstones, opScopeIsLocal,
- false /* see GEODE-1291*/);
+ return get(key, aCallbackArgument, generateCallbacks, disableCopyOnRead, true, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, true, false/* see GEODE-1291*/);
}
/**
* @param opScopeIsLocal if true then just check local storage for a value; if false then try to find the value if it is not local
* @param retainResult if true then the result may be a retained off-heap reference.
*/
public Object get(Object key, Object aCallbackArgument,
- boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
- ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones,
- boolean opScopeIsLocal, boolean retainResult) throws TimeoutException, CacheLoaderException
+ boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones,
+ boolean opScopeIsLocal, boolean allowReadFromHDFS, boolean retainResult) throws TimeoutException, CacheLoaderException
{
assert !retainResult || preferCD;
validateKey(key);
@@ -1411,8 +1440,7 @@ public class LocalRegion extends AbstractRegion
boolean isMiss = true;
try {
KeyInfo keyInfo = getKeyInfo(key, aCallbackArgument);
- Object value = getDataView().getDeserializedValue(keyInfo, this, true, disableCopyOnRead, preferCD, clientEvent, returnTombstones,
- retainResult);
+ Object value = getDataView().getDeserializedValue(keyInfo, this, true, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult);
final boolean isCreate = value == null;
isMiss = value == null || Token.isInvalid(value)
|| (!returnTombstones && value == Token.TOMBSTONE);
@@ -1425,13 +1453,13 @@ public class LocalRegion extends AbstractRegion
// if scope is local and there is no loader, then
// don't go further to try and get value
if (!opScopeIsLocal
- && ((getScope().isDistributed())
+ && ((getScope().isDistributed() && !isHDFSRegion())
|| hasServerProxy()
|| basicGetLoader() != null)) {
// serialize search/load threads if not in txn
value = getDataView().findObject(keyInfo,
this, isCreate, generateCallbacks, value, disableCopyOnRead,
- preferCD, requestingClient, clientEvent, returnTombstones);
+ preferCD, requestingClient, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
if (!returnTombstones && value == Token.TOMBSTONE) {
value = null;
}
@@ -1457,7 +1485,7 @@ public class LocalRegion extends AbstractRegion
*/
final public void recordMiss(final RegionEntry re, Object key) {
final RegionEntry e;
- if (re == null && !isTX()) {
+ if (re == null && !isTX() && !isHDFSRegion()) {
e = basicGetEntry(key);
} else {
e = re;
@@ -1466,30 +1494,60 @@ public class LocalRegion extends AbstractRegion
}
/**
+ * @return true if this region has been configured for HDFS persistence
+ */
+ public boolean isHDFSRegion() {
+ return false;
+ }
+
+ /**
+ * @return true if this region is configured to read and write data from HDFS
+ */
+ public boolean isHDFSReadWriteRegion() {
+ return false;
+ }
+
+ /**
+ * @return true if this region is configured to only write to HDFS
+ */
+ protected boolean isHDFSWriteOnly() {
+ return false;
+ }
+
+ /**
+ * FOR TESTING ONLY
+ */
+ public HoplogListenerForRegion getHoplogListener() {
+ return hoplogListener;
+ }
+
+ /**
+ * FOR TESTING ONLY
+ */
+ public HdfsRegionManager getHdfsRegionManager() {
+ return hdfsManager;
+ }
+
+ /**
* optimized to only allow one thread to do a search/load, other threads wait
* on a future
- * @param keyInfo
+ *
+ * @param keyInfo
* @param p_isCreate
* true if call found no entry; false if updating an existing
* entry
* @param generateCallbacks
* @param p_localValue
-* the value retrieved from the region for this object.
+ * the value retrieved from the region for this object.
* @param disableCopyOnRead if true then do not make a copy
* @param preferCD true if the preferred result form is CachedDeserializable
* @param clientEvent the client event, if any
* @param returnTombstones whether to return tombstones
*/
@Retained
- Object nonTxnFindObject(KeyInfo keyInfo,
- boolean p_isCreate,
- boolean generateCallbacks,
- Object p_localValue,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones)
+ Object nonTxnFindObject(KeyInfo keyInfo, boolean p_isCreate,
+ boolean generateCallbacks, Object p_localValue, boolean disableCopyOnRead, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
throws TimeoutException, CacheLoaderException
{
final Object key = keyInfo.getKey();
@@ -1548,8 +1606,7 @@ public class LocalRegion extends AbstractRegion
try {
boolean partitioned = this.getDataPolicy().withPartitioning();
if (!partitioned) {
- localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false,
- false);
+ localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false, false/*allowReadFromHDFS*/, false);
// stats have now been updated
if (localValue != null && !Token.isInvalid(localValue)) {
@@ -1558,7 +1615,7 @@ public class LocalRegion extends AbstractRegion
}
isCreate = localValue == null;
result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks,
- localValue, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+ localValue, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
} else {
@@ -1566,7 +1623,7 @@ public class LocalRegion extends AbstractRegion
// For PRs we don't want to deserialize the value and we can't use findObjectInSystem because
// it can invoke code that is transactional.
result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks,
- localValue, disableCopyOnRead, preferCD, null, null, false);
+ localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS);
// TODO why are we not passing the client event or returnTombstones in the above invokation?
}
@@ -1749,6 +1806,7 @@ public class LocalRegion extends AbstractRegion
public final EntryEventImpl newPutEntryEvent(Object key, Object value,
Object aCallbackArgument) {
EntryEventImpl ev = newUpdateEntryEvent(key, value, aCallbackArgument);
+ ev.setFetchFromHDFS(false);
ev.setPutDML(true);
return ev;
}
@@ -1880,11 +1938,23 @@ public class LocalRegion extends AbstractRegion
}
}
+ protected boolean includeHDFSResults() {
+ return isUsedForPartitionedRegionBucket()
+ && isHDFSReadWriteRegion()
+ && getPartitionedRegion().includeHDFSResults();
+ }
+
+
/** a fast estimate of total number of entries locally in the region */
public long getEstimatedLocalSize() {
RegionMap rm;
if (!this.isDestroyed) {
long size;
+ if (isHDFSReadWriteRegion() && this.initialized) {
+ // this size is not used by HDFS region iterators
+ // fixes bug 49239
+ return 0;
+ }
// if region has not been initialized yet, then get the estimate from
// disk region's recovery map if available
if (!this.initialized && this.diskRegion != null
@@ -2196,6 +2266,9 @@ public class LocalRegion extends AbstractRegion
if (this.imageState.isClient() && !this.concurrencyChecksEnabled) {
return result - this.imageState.getDestroyedEntriesCount();
}
+ if (includeHDFSResults()) {
+ return result;
+ }
return result - this.tombstoneCount.get();
}
}
@@ -2931,18 +3004,11 @@ public class LocalRegion extends AbstractRegion
* @param clientEvent the client's event, if any. If not null, we set the version tag
* @param returnTombstones TODO
* @return the deserialized value
- * @see LocalRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
- */
- protected Object findObjectInSystem(KeyInfo keyInfo,
- boolean isCreate,
- TXStateInterface tx,
- boolean generateCallbacks,
- Object localValue,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones)
+ * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean, boolean )
+ */
+ protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
+ TXStateInterface tx, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
throws CacheLoaderException, TimeoutException
{
final Object key = keyInfo.getKey();
@@ -5317,6 +5383,9 @@ public class LocalRegion extends AbstractRegion
// Notify bridge clients (if this is a BridgeServer)
event.setEventType(eventType);
notifyBridgeClients(event);
+ if (this.hdfsStoreName != null) {
+ notifyGatewaySender(eventType, event);
+ }
if(callDispatchListenerEvent){
dispatchListenerEvent(eventType, event);
}
@@ -7202,8 +7271,24 @@ public class LocalRegion extends AbstractRegion
if (generateEventID()) {
event.setNewEventId(cache.getDistributedSystem());
}
+ event.setFetchFromHDFS(false);
+ return event;
+ }
+
+ @Retained
+ protected EntryEventImpl generateCustomEvictDestroyEvent(final Object key) {
+ @Retained EntryEventImpl event = EntryEventImpl.create(
+ this, Operation.CUSTOM_EVICT_DESTROY, key, null/* newValue */,
+ null, false, getMyId());
+
+ // Fix for bug#36963
+ if (generateEventID()) {
+ event.setNewEventId(cache.getDistributedSystem());
+ }
+ event.setFetchFromHDFS(false);
return event;
}
+
/**
* @return true if the evict destroy was done; false if it was not needed
*/
@@ -9856,6 +9941,8 @@ public class LocalRegion extends AbstractRegion
}
}
+ clearHDFSData();
+
if (!isProxy()) {
// Now we need to recreate all the indexes.
//If the indexManager is null we don't have to worry
@@ -9894,6 +9981,11 @@ public class LocalRegion extends AbstractRegion
}
}
+ /**Clear HDFS data, if present */
+ protected void clearHDFSData() {
+ //do nothing, clear is implemented for subclasses like BucketRegion.
+ }
+
@Override
void basicLocalClear(RegionEventImpl rEvent)
{
@@ -10670,6 +10762,7 @@ public class LocalRegion extends AbstractRegion
}
public final DistributedPutAllOperation newPutAllForPUTDmlOperation(Map<?, ?> map, Object callbackArg) {
DistributedPutAllOperation dpao = newPutAllOperation(map, callbackArg);
+ dpao.getEvent().setFetchFromHDFS(false);
dpao.getEvent().setPutDML(true);
return dpao;
}
@@ -10725,6 +10818,7 @@ public class LocalRegion extends AbstractRegion
putallOp, this, Operation.PUTALL_CREATE, key, value);
try {
+ event.setFetchFromHDFS(putallOp.getEvent().isFetchFromHDFS());
event.setPutDML(putallOp.getEvent().isPutDML());
if (tagHolder != null) {
@@ -12827,6 +12921,22 @@ public class LocalRegion extends AbstractRegion
public Integer getCountNotFoundInLocal() {
return countNotFoundInLocal.get();
}
+ /// End of Variables and methods for test Hook for HDFS ///////
+ public void forceHDFSCompaction(boolean isMajor, Integer maxWaitTime) {
+ throw new UnsupportedOperationException(
+ LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+ .toLocalizedString(getName()));
+ }
+
+ public void flushHDFSQueue(int maxWaitTime) {
+ throw new UnsupportedOperationException(
+ LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+ .toLocalizedString(getName()));
+ }
+
+ public long lastMajorHDFSCompaction() {
+ throw new UnsupportedOperationException();
+ }
public static void simulateClearForTests(boolean flag) {
simulateClearForTests = flag;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
index c26ff10..5193a17 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
@@ -17,6 +17,7 @@
package com.gemstone.gemfire.internal.cache;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Set;
import com.gemstone.gemfire.cache.EntryNotFoundException;
@@ -35,16 +36,9 @@ public class LocalRegionDataView implements InternalDataView {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
- public Object getDeserializedValue(KeyInfo keyInfo,
- LocalRegion localRegion,
- boolean updateStats,
- boolean disableCopyOnRead,
- boolean preferCD,
- EntryEventImpl clientEvent,
- boolean returnTombstones,
- boolean retainResult) {
- return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones,
- retainResult);
+ public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
+ boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadsFromHDFS, boolean retainResult) {
+ return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadsFromHDFS, retainResult);
}
/* (non-Javadoc)
@@ -142,17 +136,9 @@ public class LocalRegionDataView implements InternalDataView {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
*/
- public Object findObject(KeyInfo keyInfo,
- LocalRegion r,
- boolean isCreate,
- boolean generateCallbacks,
- Object value,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) {
- return r.nonTxnFindObject(keyInfo, isCreate, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+ public Object findObject(KeyInfo keyInfo, LocalRegion r, boolean isCreate,
+ boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+ return r.nonTxnFindObject(keyInfo, isCreate, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
}
/* (non-Javadoc)
@@ -194,12 +180,7 @@ public class LocalRegionDataView implements InternalDataView {
* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.BucketRegion, java.lang.Object, java.lang.Object)
*/
- public Object getSerializedValue(LocalRegion localRegion,
- KeyInfo key,
- boolean doNotLockEntry,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) throws DataLocationException {
+ public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
throw new IllegalStateException();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
index 4c1fa7f..bb83383 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
@@ -461,6 +461,26 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp {
}
@Override
+ public boolean isMarkedForEviction() {
+ throw new UnsupportedOperationException(LocalizedStrings
+ .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
+ .toLocalizedString());
+ }
+ @Override
+ public void setMarkedForEviction() {
+ throw new UnsupportedOperationException(LocalizedStrings
+ .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
+ .toLocalizedString());
+ }
+
+ @Override
+ public void clearMarkedForEviction() {
+ throw new UnsupportedOperationException(LocalizedStrings
+ .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
+ .toLocalizedString());
+ }
+
+ @Override
public boolean isValueNull() {
return (null == getValueAsToken());
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
index 4728594..fe8813e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
@@ -7384,6 +7384,19 @@ public final class Oplog implements CompactableOplog, Flushable {
// TODO Auto-generated method stub
}
@Override
+ public boolean isMarkedForEviction() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+ @Override
+ public void setMarkedForEviction() {
+ // TODO Auto-generated method stub
+ }
+ @Override
+ public void clearMarkedForEviction() {
+ // TODO Auto-generated method stub
+ }
+ @Override
public boolean isInvalid() {
// TODO Auto-generated method stub
return false;