You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/04/29 01:10:04 UTC
[25/50] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS
related code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
index 3896800..c924be5 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
@@ -373,13 +373,20 @@ public final class HARegion extends DistributedRegion
/**
* @return the deserialized value
- * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean, boolean)
+ * @see LocalRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
*
*/
@Override
- protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
- TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead,
- boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
+ protected Object findObjectInSystem(KeyInfo keyInfo,
+ boolean isCreate,
+ TXStateInterface txState,
+ boolean generateCallbacks,
+ Object localValue,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones)
throws CacheLoaderException, TimeoutException {
Object value = null;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
deleted file mode 100644
index f6c6aa7..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache;
-
-import java.util.Collection;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.TimeoutException;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
-import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
-import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * Implementation of RegionMap that reads data from HDFS and adds LRU behavior
- *
- */
-public class HDFSLRURegionMap extends AbstractLRURegionMap implements HDFSRegionMap {
-
- private static final Logger logger = LogService.getLogger();
-
- private final HDFSRegionMapDelegate delegate;
-
- /**
- * A tool from the eviction controller for sizing entries and
- * expressing limits.
- */
- private EnableLRU ccHelper;
-
- /** The list of nodes in LRU order */
- private NewLRUClockHand lruList;
-
- private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
-
- public HDFSLRURegionMap(LocalRegion owner, Attributes attrs,
- InternalRegionArguments internalRegionArgs) {
- super(internalRegionArgs);
- assert owner instanceof BucketRegion;
- initialize(owner, attrs, internalRegionArgs);
- this.delegate = new HDFSRegionMapDelegate(owner, attrs, internalRegionArgs, this);
- }
-
- @Override
- public RegionEntry getEntry(Object key) {
- return delegate.getEntry(key, null);
- }
-
- @Override
- protected RegionEntry getEntry(EntryEventImpl event) {
- return delegate.getEntry(event);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Collection<RegionEntry> regionEntries() {
- return delegate.regionEntries();
- }
-
- @Override
- public int size() {
- return delegate.size();
- }
-
- @Override
- public boolean isEmpty() {
- return delegate.isEmpty();
- }
-
- @Override
- protected void _setCCHelper(EnableLRU ccHelper) {
- this.ccHelper = ccHelper;
- }
-
- @Override
- protected EnableLRU _getCCHelper() {
- return this.ccHelper;
- }
-
- @Override
- protected void _setLruList(NewLRUClockHand lruList) {
- this.lruList = lruList;
- }
-
- @Override
- protected NewLRUClockHand _getLruList() {
- return this.lruList;
- }
-
- @Override
- public HDFSRegionMapDelegate getDelegate() {
- return this.delegate;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
deleted file mode 100644
index 2a7baef..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache;
-
-/**
- * Interface implemented by RegionMap implementations that
- * read from HDFS.
- *
- *
- */
-public interface HDFSRegionMap {
-
- /**
- * @return the {@link HDFSRegionMapDelegate} that does
- * all the work
- */
- public HDFSRegionMapDelegate getDelegate();
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
deleted file mode 100644
index a2ef653..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
+++ /dev/null
@@ -1,540 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache;
-
-import java.io.IOException;
-import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.EvictionAction;
-import com.gemstone.gemfire.cache.CustomEvictionAttributes;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSEntriesSet;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSEntriesSet.HDFSIterator;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
-import com.gemstone.gemfire.internal.cache.RegionMap.Attributes;
-import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
-import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
-import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
-import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
-
-/**
- * This class encapsulates all the functionality of HDFSRegionMap, so
- * that it can be provided to HDFSLRURegionMap.
- *
- */
-public class HDFSRegionMapDelegate {
-
- private static final Logger logger = LogService.getLogger();
-
- private final BucketRegion owner;
-
- private ConcurrentParallelGatewaySenderQueue hdfsQueue;
-
- private final RegionMap backingRM;
-
- /** queue of dead iterators */
- private final ReferenceQueue<HDFSIterator> refs;
-
- private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
-
- /**
- * used for serializing fetches from HDFS
- */
- private ConcurrentMap<Object, FutureResult> futures = new ConcurrentHashMap<Object, FutureResult>();
-
- public HDFSRegionMapDelegate(LocalRegion owner, Attributes attrs,
- InternalRegionArguments internalRegionArgs, RegionMap backingRM) {
- assert owner instanceof BucketRegion;
- this.owner = (BucketRegion) owner;
- this.backingRM = backingRM;
- refs = new ReferenceQueue<HDFSEntriesSet.HDFSIterator>();
- }
-
- public RegionEntry getEntry(Object key, EntryEventImpl event) {
-
- RegionEntry re = getEntry(key, event, true);
- // get from tx should put the entry back in map
- // it should be evicted once tx completes
- /**MergeGemXDHDFSToGFE txstate does not apply for this*/
- /* if (re != null && getTXState(event) != null) {
- if (re != null) {
- // put the region entry in backing CHM of AbstractRegionMap so that
- // it can be locked in basicPut/destroy
- RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
- if (oldRe != null) {
- if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
- ((OffHeapRegionEntry)re).release();
- }
- return oldRe;
- }
- re.setMarkedForEviction();
- owner.updateSizeOnCreate(key,
- owner.calculateRegionEntryValueSize(re));
- ((AbstractRegionMap)backingRM).incEntryCount(1);
- ((AbstractRegionMap)backingRM).lruEntryCreate(re);
- }*/
- return re;
- }
-
- /*
- private TXStateInterface getTXState(EntryEventImpl event) {
- return event != null ? event.getTXState(this.owner) : this.owner
- .getTXState();
- }*/
-
- /**
- *
- * @param key
- * @param event
- * @param forceOnHeap if true will return heap version of off-heap region entries
- */
- private RegionEntry getEntry(Object key, EntryEventImpl event, boolean forceOnHeap) {
- closeDeadIterators();
-
- RegionEntry re = backingRM.getEntryInVM(key);
- if (logger.isTraceEnabled() || DEBUG) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: Found the key in CHM: " + key
- + " ,value=" + (re == null? "null" : "[" + re._getValue() + " or (" + re.getValueAsToken() + ")]")));
- }
- if ((re == null || (re.isRemoved() && !re.isTombstone()))
- && owner.getBucketAdvisor().isPrimary() && allowReadFromHDFS()) {
- if (logger.isTraceEnabled() || DEBUG) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: fetching from hdfs key:" + key));
- }
- try {
- this.owner.getPartitionedRegion().hdfsCalled(key);
- re = getEntryFromFuture(key);
- if (re != null) {
- return re;
- }
-
- assert this.owner.getPartitionedRegion().getDataPolicy()
- .withHDFS();
- byte[] k = EntryEventImpl.serialize(key);
-
- // for destroy ops we will retain the entry in the region map so
- // tombstones can be tracked
- //final boolean forceOnHeap = (event==null || !event.getOperation().isDestroy());
-
- // get from queue
- re = getFromHDFSQueue(key, k, forceOnHeap);
- if (re == null) {
- // get from HDFS
- re = getFromHDFS(key, k, forceOnHeap);
- }
- if (re != null && re.isTombstone()) {
- RegionVersionVector vector = this.owner.getVersionVector();
-// if (vector == null) {
-// this.owner.getLogWriterI18n().info(LocalizedStrings.DEBUG,
-// "found a tombstone in a region w/o a version vector: " + re + "; region: " + this.owner);
-// }
- if (vector == null
- || vector.isTombstoneTooOld(re.getVersionStamp().getMemberID(),
- re.getVersionStamp().getRegionVersion())) {
- re = null;
- }
- }
- if (logger.isTraceEnabled() || DEBUG) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: returning from hdfs re:" + re));
- }
- } catch (ForceReattemptException e) {
- throw new PrimaryBucketException(e.getLocalizedMessage(), e);
- } catch (IOException e) {
- throw new HDFSIOException("Error reading from HDFS", e);
- } finally {
- notifyFuture(key, re);
- // If we mark it here, the table scan may miss it causing updates/delete using table
- // scan to fail.
-// if (re != null) {
-// re.setMarkedForEviction();
-// }
- if(re != null && event != null && !re.isTombstone()) {
- if (logger.isTraceEnabled() || DEBUG) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: loaded from hdfs re:" + re));
- }
- BucketRegion br = (BucketRegion)owner;
- //CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes();
- //if(csAttr!=null)
- event.setLoadedFromHDFS(true);
- }
- }
- }
- if(re!=null && re.isMarkedForEviction() && !re.isTombstone()) {
- if(event!=null) {
- event.setLoadedFromHDFS(true);
- }
- }
-
- return re;
- }
-
- /**
- * This method returns true if the RegionEntry should be read from HDFS.
- * fixes #49101 by not allowing reads from HDFS for persistent regions
- * that do not define an eviction criteria.
- *
- * @return true if RegionEntry should be read from HDFS
- */
- private boolean allowReadFromHDFS() {
- if (!owner.getDataPolicy().withPersistence()
- || owner.getCustomEvictionAttributes() != null
- || isEvictionActionLocalDestroy()){
- /**MergeGemXDHDFSToGFE this is used for global index. Hence not required here*/
- //|| owner.isUsedForIndex()) {
- // when region does not have persistence, we have to read from HDFS (even
- // though there is no eviction criteria) for constraint checks
- return true;
- }
- return false;
- }
-
- private boolean isEvictionActionLocalDestroy() {
- PartitionedRegion pr = owner.getPartitionedRegion();
- if (pr.getEvictionAttributes() != null) {
- return pr.getEvictionAttributes().getAction() == EvictionAction.LOCAL_DESTROY;
- }
- return false;
- }
-
- protected RegionEntry getEntry(EntryEventImpl event) {
- RegionEntry re = getEntry(event.getKey(), event, false);
- if (re != null && event.isLoadedFromHDFS()) {
- // put the region entry in backing CHM of AbstractRegionMap so that
- // it can be locked in basicPut/destroy
- RegionEntry oldRe = backingRM.putEntryIfAbsent(event.getKey(), re);
- if (oldRe != null) {
- if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
- ((OffHeapRegionEntry) re).release();
- }
- return oldRe;
- }
- // since the entry is faulted in from HDFS, it must have
- // satisfied the eviction criteria in the past, so mark it for eviction
- re.setMarkedForEviction();
-
- owner.updateSizeOnCreate(event.getKey(), owner.calculateRegionEntryValueSize(re));
- ((AbstractRegionMap) backingRM).incEntryCount(1);
- ((AbstractRegionMap) backingRM).lruEntryCreate(re);
- }
- return re;
- }
-
- @SuppressWarnings("unchecked")
- public Collection<RegionEntry> regionEntries() {
- closeDeadIterators();
- if (!owner.getPartitionedRegion().includeHDFSResults()) {
- if (logger.isDebugEnabled() || DEBUG) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #regionEntries"));
- }
- return backingRM.regionEntriesInVM();
- }
-
- try {
- return createEntriesSet(IteratorType.ENTRIES);
- } catch (ForceReattemptException e) {
- throw new PrimaryBucketException(e.getLocalizedMessage(), e);
- }
- }
-
- public int size() {
- closeDeadIterators();
- if (!owner.getPartitionedRegion().includeHDFSResults()) {
- if (logger.isDebugEnabled() || DEBUG) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #size"));
- }
- return backingRM.sizeInVM();
- }
-
- try {
- return createEntriesSet(IteratorType.KEYS).size();
- } catch (ForceReattemptException e) {
- throw new PrimaryBucketException(e.getLocalizedMessage(), e);
- }
- }
-
- public boolean isEmpty() {
- closeDeadIterators();
- if (!owner.getPartitionedRegion().includeHDFSResults()) {
- if (logger.isDebugEnabled() || DEBUG) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #isEmpty"));
- }
- return backingRM.sizeInVM() == 0;
- }
-
- try {
- return createEntriesSet(IteratorType.KEYS).isEmpty();
- } catch (ForceReattemptException e) {
- throw new PrimaryBucketException(e.getLocalizedMessage(), e);
- }
- }
-
- private void notifyFuture(Object key, RegionEntry re) {
- FutureResult future = this.futures.remove(key);
- if (future != null) {
- future.set(re);
- }
- }
-
- private RegionEntry getEntryFromFuture(Object key) {
- FutureResult future = new FutureResult(this.owner.getCancelCriterion());
- FutureResult old = this.futures.putIfAbsent(key, future);
- if (old != null) {
- if (logger.isTraceEnabled() || DEBUG) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: waiting for concurrent fetch to complete for key:" + key));
- }
- try {
- return (RegionEntry) old.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- this.owner.getCache().getCancelCriterion().checkCancelInProgress(null);
- }
- }
- return null;
- }
-
- private RegionEntry getFromHDFS(Object key, byte[] k, boolean forceOnHeap) throws IOException, ForceReattemptException {
- SortedHoplogPersistedEvent ev;
- try {
- ev = (SortedHoplogPersistedEvent) owner.getHoplogOrganizer().read(k);
- } catch (IOException e) {
- owner.checkForPrimary();
- throw e;
- }
- if (ev != null) {
- if (logger.isTraceEnabled() || DEBUG) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: got from hdfs ev:" + ev));
- }
- return getEntryFromEvent(key, ev, forceOnHeap, false);
- }
- return null;
- }
-
- /**
- * set the versionTag on the newly faulted-in entry
- */
- private void setVersionTag(RegionEntry re, VersionTag versionTag) {
- if (owner.concurrencyChecksEnabled) {
- versionTag.setMemberID(
- owner.getVersionVector().getCanonicalId(versionTag.getMemberID()));
- VersionStamp versionedRe = (VersionStamp) re;
- versionedRe.setVersions(versionTag);
- }
- }
-
- private RegionEntry getFromHDFSQueue(Object key, byte[] k, boolean forceOnHeap) throws ForceReattemptException {
- ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
- if (q == null) return null;
- HDFSGatewayEventImpl hdfsGatewayEvent = (HDFSGatewayEventImpl) q.get(owner.getPartitionedRegion(), k, owner.getId());
- if (hdfsGatewayEvent != null) {
- if (logger.isTraceEnabled() || DEBUG) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: got from hdfs queue: " + hdfsGatewayEvent));
- }
- return getEntryFromEvent(key, hdfsGatewayEvent, forceOnHeap, false);
- }
- return null;
- }
-
- private ConcurrentParallelGatewaySenderQueue getHDFSQueue()
- throws ForceReattemptException {
- if (this.hdfsQueue == null) {
- String asyncQId = this.owner.getPartitionedRegion().getHDFSEventQueueName();
- final AsyncEventQueueImpl asyncQ = (AsyncEventQueueImpl)this.owner.getCache().getAsyncEventQueue(asyncQId);
- final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
- AbstractGatewaySenderEventProcessor ep = gatewaySender.getEventProcessor();
- if (ep == null) return null;
- hdfsQueue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue();
- }
-
- // Check whether the queue has become primary here.
- // There could be some time between bucket becoming a primary
- // and underlying queue becoming a primary, so isPrimaryWithWait()
- // waits for some time for the queue to become a primary on this member
- final HDFSBucketRegionQueue brq = hdfsQueue.getBucketRegionQueue(
- this.owner.getPartitionedRegion(), this.owner.getId());
- if (brq != null) {
- if (owner.getBucketAdvisor().isPrimary()
- && !brq.getBucketAdvisor().isPrimaryWithWait()) {
- InternalDistributedMember primaryHolder = brq.getBucketAdvisor()
- .basicGetPrimaryMember();
- throw new PrimaryBucketException("Bucket " + brq.getName()
- + " is not primary. Current primary holder is " + primaryHolder);
- }
- }
-
- return hdfsQueue;
- }
-
- public RegionEntry getEntryFromEvent(Object key, HDFSGatewayEventImpl event, boolean forceOnHeap, boolean forUpdate) {
- Object val;
- if (event.getOperation().isDestroy()) {
- val = Token.TOMBSTONE;
- } else if (event.getOperation().isInvalidate()) {
- val = Token.INVALID;
- } else {
- val = event.getValue();
- }
- RegionEntry re = null;
- final TXStateInterface tx = owner.getTXState();
- if (tx == null) {
- re = createRegionEntry(key, val, event.getVersionTag(), forceOnHeap);
- return re;
- }
- else
- if (val != null) {
- if (((re = this.backingRM.getEntryInVM(key)) == null)
- || (re.isRemoved() && !re.isTombstone())) {
- boolean shouldCreateOnHeapEntry = !(owner.getOffHeap() && forUpdate);
- re = createRegionEntry(key, val, event.getVersionTag(), shouldCreateOnHeapEntry);
- if (forUpdate) {
- if (re != null && tx != null) {
- // put the region entry in backing CHM of AbstractRegionMap so that
- // it can be locked in basicPut/destroy
- RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
- if (oldRe != null) {
- if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
- ((OffHeapRegionEntry)re).release();
- }
- return oldRe;
- }
- re.setMarkedForEviction();
- owner.updateSizeOnCreate(key,
- owner.calculateRegionEntryValueSize(re));
- ((AbstractRegionMap)backingRM).incEntryCount(1);
- ((AbstractRegionMap)backingRM).lruEntryCreate(re);
- }
- }
- }
- }
- return re;
- }
-
- public RegionEntry getEntryFromEvent(Object key, SortedHoplogPersistedEvent event, boolean forceOnHeap, boolean forUpdate) {
- Object val = getValueFromEvent(event);
- RegionEntry re = null;
- final TXStateInterface tx = owner.getTXState();
- if (tx == null) {
- re = createRegionEntry(key, val, event.getVersionTag(), forceOnHeap);
- return re;
- }
- else // FOR TX case, we need to create off heap entry if required
- if (val != null) {
- if (((re = this.backingRM.getEntryInVM(key)) == null)
- || (re.isRemoved() && !re.isTombstone())) {
- boolean shouldCreateOnHeapEntry = !(owner.getOffHeap() && forUpdate);
- re = createRegionEntry(key, val, event.getVersionTag(), shouldCreateOnHeapEntry);
- if(forUpdate) {
- if (re != null && tx != null) {
- // put the region entry in backing CHM of AbstractRegionMap so that
- // it can be locked in basicPut/destroy
- RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
- if (oldRe != null) {
- if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
- ((OffHeapRegionEntry)re).release();
- }
- return oldRe;
- }
- re.setMarkedForEviction();
- owner.updateSizeOnCreate(key,
- owner.calculateRegionEntryValueSize(re));
- ((AbstractRegionMap)backingRM).incEntryCount(1);
- ((AbstractRegionMap)backingRM).lruEntryCreate(re);
- }
- }
- }
- }
- return re;
- }
-
- private RegionEntry createRegionEntry(Object key, Object value, VersionTag tag, boolean forceOnHeap) {
- RegionEntryFactory ref = backingRM.getEntryFactory();
- if (forceOnHeap) {
- ref = ref.makeOnHeap();
- }
- value = getValueDuringGII(key, value);
- RegionEntry re = ref.createEntry(this.owner, key, value);
- setVersionTag(re, tag);
- if (re instanceof LRUEntry) {
- assert backingRM instanceof AbstractLRURegionMap;
- EnableLRU ccHelper = ((AbstractLRURegionMap)backingRM)._getCCHelper();
- ((LRUEntry)re).updateEntrySize(ccHelper);
- }
- return re;
- }
-
- private Object getValueDuringGII(Object key, Object value) {
- if (owner.getIndexUpdater() != null && !owner.isInitialized()) {
- return AbstractRegionMap.listOfDeltasCreator.newValue(key, owner, value,
- null);
- }
- return value;
- }
-
- private Set createEntriesSet(IteratorType type)
- throws ForceReattemptException {
- ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
- if (q == null) return Collections.emptySet();
- HDFSBucketRegionQueue brq = q.getBucketRegionQueue(this.owner.getPartitionedRegion(), owner.getId());
- return new HDFSEntriesSet(owner, brq, owner.getHoplogOrganizer(), type, refs);
- }
-
- private void closeDeadIterators() {
- Reference<? extends HDFSIterator> weak;
- while ((weak = refs.poll()) != null) {
- if (logger.isTraceEnabled() || DEBUG) {
- logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Closing weak ref for iterator "
- + weak.get()));
- }
- weak.get().close();
- }
- }
-
- /**
- * gets the value from event, deserializing if necessary.
- */
- private Object getValueFromEvent(PersistedEventImpl ev) {
- if (ev.getOperation().isDestroy()) {
- return Token.TOMBSTONE;
- } else if (ev.getOperation().isInvalidate()) {
- return Token.INVALID;
- }
- return ev.getValue();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
deleted file mode 100644
index 9336ed7..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache;
-
-import java.util.Collection;
-
-import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.TimeoutException;
-import com.gemstone.gemfire.internal.size.SingleObjectSizer;
-
-/**
- * Implementation of RegionMap that reads data from HDFS.
- *
- */
-public class HDFSRegionMapImpl extends AbstractRegionMap implements HDFSRegionMap {
-
- private final HDFSRegionMapDelegate delegate;
-
- private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
-
- public HDFSRegionMapImpl(LocalRegion owner, Attributes attrs,
- InternalRegionArguments internalRegionArgs) {
- super(internalRegionArgs);
- assert owner instanceof BucketRegion;
- initialize(owner, attrs, internalRegionArgs, false);
- this.delegate = new HDFSRegionMapDelegate(owner, attrs, internalRegionArgs, this);
- }
-
- @Override
- public RegionEntry getEntry(Object key) {
- return delegate.getEntry(key, null);
- }
-
- @Override
- protected RegionEntry getEntry(EntryEventImpl event) {
- return delegate.getEntry(event);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Collection<RegionEntry> regionEntries() {
- return delegate.regionEntries();
- }
-
- @Override
- public int size() {
- return delegate.size();
- }
-
- @Override
- public boolean isEmpty() {
- return delegate.isEmpty();
- }
-
- @Override
- public HDFSRegionMapDelegate getDelegate() {
- return this.delegate;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
index 36eee80..bda5a27 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
@@ -20,8 +20,6 @@ package com.gemstone.gemfire.internal.cache;
import java.util.Collection;
import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
import com.gemstone.gemfire.cache.query.internal.cq.CqService;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.internal.cache.extension.Extensible;
@@ -45,7 +43,5 @@ public interface InternalCache extends Cache, Extensible<Cache> {
public CqService getCqService();
- public Collection<HDFSStoreImpl> getHDFSStores() ;
-
public <T extends CacheService> T getService(Class<T> clazz);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
index e506f2e..0885477 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
@@ -39,17 +39,22 @@ public interface InternalDataView {
* @param keyInfo
* @param localRegion
* @param updateStats
- * @param disableCopyOnRead
- * @param preferCD
+ * @param disableCopyOnRead
+ * @param preferCD
* @param clientEvent TODO
* @param returnTombstones TODO
* @param retainResult if true then the result may be a retained off-heap reference
* @return the object associated with the key
*/
@Retained
- Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
- boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
- boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult);
+ Object getDeserializedValue(KeyInfo keyInfo,
+ LocalRegion localRegion,
+ boolean updateStats,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones,
+ boolean retainResult);
/**
* @param event
@@ -182,8 +187,8 @@ public interface InternalDataView {
* @return the Object associated with the key
*/
Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, boolean generateCallbacks,
- Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS);
+ Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones);
/**
@@ -224,13 +229,18 @@ public interface InternalDataView {
*
* @param localRegion
* @param key
- * @param doNotLockEntry
+ * @param doNotLockEntry
* @param requestingClient the client that made the request, or null if not from a client
* @param clientEvent the client event, if any
* @param returnTombstones TODO
* @return the serialized value from the cache
*/
- Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException;
+ Object getSerializedValue(LocalRegion localRegion,
+ KeyInfo key,
+ boolean doNotLockEntry,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) throws DataLocationException;
abstract void checkSupportsRegionDestroy() throws UnsupportedOperationInTransactionException;
abstract void checkSupportsRegionInvalidate() throws UnsupportedOperationInTransactionException;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
index 41e763d..f7d46fe 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
@@ -37,7 +37,6 @@ public final class InternalRegionArguments
private boolean isUsedForPartitionedRegionAdmin;
private boolean isUsedForSerialGatewaySenderQueue;
private boolean isUsedForParallelGatewaySenderQueue;
- private boolean isUsedForHDFSParallelGatewaySenderQueue = false;
private int bucketRedundancy;
private boolean isUsedForPartitionedRegionBucket;
private RegionAdvisor partitionedRegionAdvisor;
@@ -273,26 +272,11 @@ public final class InternalRegionArguments
this.isUsedForParallelGatewaySenderQueue = queueFlag;
return this;
}
- public InternalRegionArguments setIsUsedForHDFSParallelGatewaySenderQueue(
- boolean queueFlag) {
- this.isUsedForHDFSParallelGatewaySenderQueue = queueFlag;
- return this;
- }
public boolean isUsedForParallelGatewaySenderQueue() {
return this.isUsedForParallelGatewaySenderQueue;
}
- public boolean isUsedForHDFSParallelGatewaySenderQueue() {
- return this.isUsedForHDFSParallelGatewaySenderQueue;
- }
-
- public boolean isReadWriteHDFSRegion() {
- return isUsedForPartitionedRegionBucket()
- && getPartitionedRegion().getHDFSStoreName() != null
- && !getPartitionedRegion().getHDFSWriteOnly();
- }
-
public InternalRegionArguments setParallelGatewaySender(
AbstractGatewaySender pgSender) {
this.parallelGatewaySender = pgSender;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index b3de9b7..3ad294c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -116,11 +116,6 @@ import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
import com.gemstone.gemfire.cache.control.ResourceManager;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.ResultCollector;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
-import com.gemstone.gemfire.cache.hdfs.internal.HoplogListenerForRegion;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.cache.query.FunctionDomainException;
import com.gemstone.gemfire.cache.query.Index;
@@ -465,10 +460,6 @@ public class LocalRegion extends AbstractRegion
// Lock for updating PR MetaData on client side
public final Lock clientMetaDataLock = new ReentrantLock();
-
- protected HdfsRegionManager hdfsManager;
- protected HoplogListenerForRegion hoplogListener;
-
/**
* There seem to be cases where a region can be created and yet the
* distributed system is not yet in place...
@@ -641,7 +632,6 @@ public class LocalRegion extends AbstractRegion
}
}
- this.hdfsManager = initHDFSManager();
this.dsi = findDiskStore(attrs, internalRegionArgs);
this.diskRegion = createDiskRegion(internalRegionArgs);
this.entries = createRegionMap(internalRegionArgs);
@@ -696,22 +686,8 @@ public class LocalRegion extends AbstractRegion
}
- private HdfsRegionManager initHDFSManager() {
- HdfsRegionManager hdfsMgr = null;
- if (this.getHDFSStoreName() != null) {
- this.hoplogListener = new HoplogListenerForRegion();
- HDFSRegionDirector.getInstance().setCache(cache);
- hdfsMgr = HDFSRegionDirector.getInstance().manageRegion(this,
- this.getHDFSStoreName(), hoplogListener);
- }
- return hdfsMgr;
- }
-
private RegionMap createRegionMap(InternalRegionArguments internalRegionArgs) {
RegionMap result = null;
- if ((internalRegionArgs.isReadWriteHDFSRegion()) && this.diskRegion != null) {
- this.diskRegion.setEntriesMapIncompatible(true);
- }
if (this.diskRegion != null) {
result = this.diskRegion.useExistingRegionMap(this);
}
@@ -977,11 +953,6 @@ public class LocalRegion extends AbstractRegion
existing = (LocalRegion)this.subregions.get(subregionName);
if (existing == null) {
- // create the async queue for HDFS if required.
- HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath,
- regionAttributes, this.cache);
- regionAttributes = cache.setEvictionAttributesForLargeRegion(
- regionAttributes);
if (regionAttributes.getScope().isDistributed()
&& internalRegionArgs.isUsedForPartitionedRegionBucket()) {
final PartitionedRegion pr = internalRegionArgs
@@ -991,15 +962,8 @@ public class LocalRegion extends AbstractRegion
internalRegionArgs.setKeyRequiresRegionContext(pr
.keyRequiresRegionContext());
if (pr.isShadowPR()) {
- if (!pr.isShadowPRForHDFS()) {
- newRegion = new BucketRegionQueue(subregionName, regionAttributes,
- this, this.cache, internalRegionArgs);
- }
- else {
- newRegion = new HDFSBucketRegionQueue(subregionName, regionAttributes,
- this, this.cache, internalRegionArgs);
- }
-
+ newRegion = new BucketRegionQueue(subregionName, regionAttributes,
+ this, this.cache, internalRegionArgs);
} else {
newRegion = new BucketRegion(subregionName, regionAttributes,
this, this.cache, internalRegionArgs);
@@ -1134,7 +1098,6 @@ public class LocalRegion extends AbstractRegion
if (event.getEventId() == null && generateEventID()) {
event.setNewEventId(cache.getDistributedSystem());
}
- assert event.isFetchFromHDFS() : "validatedPut() should have been called";
// Fix for 42448 - Only make create with null a local invalidate for
// normal regions. Otherwise, it will become a distributed invalidate.
if (getDataPolicy() == DataPolicy.NORMAL) {
@@ -1261,18 +1224,20 @@ public class LocalRegion extends AbstractRegion
* @param retainResult if true then the result may be a retained off-heap reference
* @return the value for the given key
*/
- public final Object getDeserializedValue(RegionEntry re, final KeyInfo keyInfo, final boolean updateStats, boolean disableCopyOnRead,
- boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
+ public final Object getDeserializedValue(RegionEntry re,
+ final KeyInfo keyInfo,
+ final boolean updateStats,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones,
+ boolean retainResult) {
if (this.diskRegion != null) {
this.diskRegion.setClearCountReference();
}
try {
if (re == null) {
- if (allowReadFromHDFS) {
- re = this.entries.getEntry(keyInfo.getKey());
- } else {
- re = this.entries.getOperationalEntryInVM(keyInfo.getKey());
- }
+ re = this.entries.getEntry(keyInfo.getKey());
}
//skip updating the stats if the value is null
// TODO - We need to clean up the callers of the this class so that we can
@@ -1382,7 +1347,7 @@ public class LocalRegion extends AbstractRegion
public Object get(Object key, Object aCallbackArgument,
boolean generateCallbacks, EntryEventImpl clientEvent) throws TimeoutException, CacheLoaderException
{
- Object result = get(key, aCallbackArgument, generateCallbacks, false, false, null, clientEvent, false, true/*allowReadFromHDFS*/);
+ Object result = get(key, aCallbackArgument, generateCallbacks, false, false, null, clientEvent, false);
if (Token.isInvalid(result)) {
result = null;
}
@@ -1392,11 +1357,16 @@ public class LocalRegion extends AbstractRegion
/*
* @see BucketRegion#getSerialized(KeyInfo, boolean, boolean)
*/
- public Object get(Object key, Object aCallbackArgument,
- boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
- ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws TimeoutException, CacheLoaderException {
+ public Object get(Object key,
+ Object aCallbackArgument,
+ boolean generateCallbacks,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) throws TimeoutException, CacheLoaderException {
return get(key, aCallbackArgument,
- generateCallbacks, disableCopyOnRead, preferCD,requestingClient, clientEvent, returnTombstones, false, allowReadFromHDFS, false);
+ generateCallbacks, disableCopyOnRead, preferCD,requestingClient, clientEvent, returnTombstones, false, false);
}
/**
@@ -1418,16 +1388,17 @@ public class LocalRegion extends AbstractRegion
public Object getRetained(Object key, Object aCallbackArgument,
boolean generateCallbacks, boolean disableCopyOnRead,
ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean opScopeIsLocal) throws TimeoutException, CacheLoaderException {
- return get(key, aCallbackArgument, generateCallbacks, disableCopyOnRead, true, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, true, false/* see GEODE-1291*/);
+ return get(key, aCallbackArgument, generateCallbacks, disableCopyOnRead, true, requestingClient, clientEvent, returnTombstones, opScopeIsLocal,
+ false /* see GEODE-1291*/);
}
/**
* @param opScopeIsLocal if true then just check local storage for a value; if false then try to find the value if it is not local
* @param retainResult if true then the result may be a retained off-heap reference.
*/
public Object get(Object key, Object aCallbackArgument,
- boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
- ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones,
- boolean opScopeIsLocal, boolean allowReadFromHDFS, boolean retainResult) throws TimeoutException, CacheLoaderException
+ boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones,
+ boolean opScopeIsLocal, boolean retainResult) throws TimeoutException, CacheLoaderException
{
assert !retainResult || preferCD;
validateKey(key);
@@ -1440,7 +1411,8 @@ public class LocalRegion extends AbstractRegion
boolean isMiss = true;
try {
KeyInfo keyInfo = getKeyInfo(key, aCallbackArgument);
- Object value = getDataView().getDeserializedValue(keyInfo, this, true, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult);
+ Object value = getDataView().getDeserializedValue(keyInfo, this, true, disableCopyOnRead, preferCD, clientEvent, returnTombstones,
+ retainResult);
final boolean isCreate = value == null;
isMiss = value == null || Token.isInvalid(value)
|| (!returnTombstones && value == Token.TOMBSTONE);
@@ -1453,13 +1425,13 @@ public class LocalRegion extends AbstractRegion
// if scope is local and there is no loader, then
// don't go further to try and get value
if (!opScopeIsLocal
- && ((getScope().isDistributed() && !isHDFSRegion())
+ && ((getScope().isDistributed())
|| hasServerProxy()
|| basicGetLoader() != null)) {
// serialize search/load threads if not in txn
value = getDataView().findObject(keyInfo,
this, isCreate, generateCallbacks, value, disableCopyOnRead,
- preferCD, requestingClient, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
+ preferCD, requestingClient, clientEvent, returnTombstones);
if (!returnTombstones && value == Token.TOMBSTONE) {
value = null;
}
@@ -1485,7 +1457,7 @@ public class LocalRegion extends AbstractRegion
*/
final public void recordMiss(final RegionEntry re, Object key) {
final RegionEntry e;
- if (re == null && !isTX() && !isHDFSRegion()) {
+ if (re == null && !isTX()) {
e = basicGetEntry(key);
} else {
e = re;
@@ -1494,60 +1466,30 @@ public class LocalRegion extends AbstractRegion
}
/**
- * @return true if this region has been configured for HDFS persistence
- */
- public boolean isHDFSRegion() {
- return false;
- }
-
- /**
- * @return true if this region is configured to read and write data from HDFS
- */
- public boolean isHDFSReadWriteRegion() {
- return false;
- }
-
- /**
- * @return true if this region is configured to only write to HDFS
- */
- protected boolean isHDFSWriteOnly() {
- return false;
- }
-
- /**
- * FOR TESTING ONLY
- */
- public HoplogListenerForRegion getHoplogListener() {
- return hoplogListener;
- }
-
- /**
- * FOR TESTING ONLY
- */
- public HdfsRegionManager getHdfsRegionManager() {
- return hdfsManager;
- }
-
- /**
* optimized to only allow one thread to do a search/load, other threads wait
* on a future
- *
- * @param keyInfo
+ * @param keyInfo
* @param p_isCreate
* true if call found no entry; false if updating an existing
* entry
* @param generateCallbacks
* @param p_localValue
- * the value retrieved from the region for this object.
+* the value retrieved from the region for this object.
* @param disableCopyOnRead if true then do not make a copy
* @param preferCD true if the preferred result form is CachedDeserializable
* @param clientEvent the client event, if any
* @param returnTombstones whether to return tombstones
*/
@Retained
- Object nonTxnFindObject(KeyInfo keyInfo, boolean p_isCreate,
- boolean generateCallbacks, Object p_localValue, boolean disableCopyOnRead, boolean preferCD,
- ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
+ Object nonTxnFindObject(KeyInfo keyInfo,
+ boolean p_isCreate,
+ boolean generateCallbacks,
+ Object p_localValue,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones)
throws TimeoutException, CacheLoaderException
{
final Object key = keyInfo.getKey();
@@ -1606,7 +1548,8 @@ public class LocalRegion extends AbstractRegion
try {
boolean partitioned = this.getDataPolicy().withPartitioning();
if (!partitioned) {
- localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false, false/*allowReadFromHDFS*/, false);
+ localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false,
+ false);
// stats have now been updated
if (localValue != null && !Token.isInvalid(localValue)) {
@@ -1615,7 +1558,7 @@ public class LocalRegion extends AbstractRegion
}
isCreate = localValue == null;
result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks,
- localValue, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
+ localValue, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
} else {
@@ -1623,7 +1566,7 @@ public class LocalRegion extends AbstractRegion
// For PRs we don't want to deserialize the value and we can't use findObjectInSystem because
// it can invoke code that is transactional.
result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks,
- localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS);
+ localValue, disableCopyOnRead, preferCD, null, null, false);
// TODO why are we not passing the client event or returnTombstones in the above invokation?
}
@@ -1806,7 +1749,6 @@ public class LocalRegion extends AbstractRegion
public final EntryEventImpl newPutEntryEvent(Object key, Object value,
Object aCallbackArgument) {
EntryEventImpl ev = newUpdateEntryEvent(key, value, aCallbackArgument);
- ev.setFetchFromHDFS(false);
ev.setPutDML(true);
return ev;
}
@@ -1938,23 +1880,11 @@ public class LocalRegion extends AbstractRegion
}
}
- protected boolean includeHDFSResults() {
- return isUsedForPartitionedRegionBucket()
- && isHDFSReadWriteRegion()
- && getPartitionedRegion().includeHDFSResults();
- }
-
-
/** a fast estimate of total number of entries locally in the region */
public long getEstimatedLocalSize() {
RegionMap rm;
if (!this.isDestroyed) {
long size;
- if (isHDFSReadWriteRegion() && this.initialized) {
- // this size is not used by HDFS region iterators
- // fixes bug 49239
- return 0;
- }
// if region has not been initialized yet, then get the estimate from
// disk region's recovery map if available
if (!this.initialized && this.diskRegion != null
@@ -2266,9 +2196,6 @@ public class LocalRegion extends AbstractRegion
if (this.imageState.isClient() && !this.concurrencyChecksEnabled) {
return result - this.imageState.getDestroyedEntriesCount();
}
- if (includeHDFSResults()) {
- return result;
- }
return result - this.tombstoneCount.get();
}
}
@@ -3004,11 +2931,18 @@ public class LocalRegion extends AbstractRegion
* @param clientEvent the client's event, if any. If not null, we set the version tag
* @param returnTombstones TODO
* @return the deserialized value
- * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean, boolean )
- */
- protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
- TXStateInterface tx, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
+ * @see LocalRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
+ */
+ protected Object findObjectInSystem(KeyInfo keyInfo,
+ boolean isCreate,
+ TXStateInterface tx,
+ boolean generateCallbacks,
+ Object localValue,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones)
throws CacheLoaderException, TimeoutException
{
final Object key = keyInfo.getKey();
@@ -5383,9 +5317,6 @@ public class LocalRegion extends AbstractRegion
// Notify bridge clients (if this is a BridgeServer)
event.setEventType(eventType);
notifyBridgeClients(event);
- if (this.hdfsStoreName != null) {
- notifyGatewaySender(eventType, event);
- }
if(callDispatchListenerEvent){
dispatchListenerEvent(eventType, event);
}
@@ -7271,24 +7202,8 @@ public class LocalRegion extends AbstractRegion
if (generateEventID()) {
event.setNewEventId(cache.getDistributedSystem());
}
- event.setFetchFromHDFS(false);
- return event;
- }
-
- @Retained
- protected EntryEventImpl generateCustomEvictDestroyEvent(final Object key) {
- @Retained EntryEventImpl event = EntryEventImpl.create(
- this, Operation.CUSTOM_EVICT_DESTROY, key, null/* newValue */,
- null, false, getMyId());
-
- // Fix for bug#36963
- if (generateEventID()) {
- event.setNewEventId(cache.getDistributedSystem());
- }
- event.setFetchFromHDFS(false);
return event;
}
-
/**
* @return true if the evict destroy was done; false if it was not needed
*/
@@ -9941,8 +9856,6 @@ public class LocalRegion extends AbstractRegion
}
}
- clearHDFSData();
-
if (!isProxy()) {
// Now we need to recreate all the indexes.
//If the indexManager is null we don't have to worry
@@ -9981,11 +9894,6 @@ public class LocalRegion extends AbstractRegion
}
}
- /**Clear HDFS data, if present */
- protected void clearHDFSData() {
- //do nothing, clear is implemented for subclasses like BucketRegion.
- }
-
@Override
void basicLocalClear(RegionEventImpl rEvent)
{
@@ -10762,7 +10670,6 @@ public class LocalRegion extends AbstractRegion
}
public final DistributedPutAllOperation newPutAllForPUTDmlOperation(Map<?, ?> map, Object callbackArg) {
DistributedPutAllOperation dpao = newPutAllOperation(map, callbackArg);
- dpao.getEvent().setFetchFromHDFS(false);
dpao.getEvent().setPutDML(true);
return dpao;
}
@@ -10818,7 +10725,6 @@ public class LocalRegion extends AbstractRegion
putallOp, this, Operation.PUTALL_CREATE, key, value);
try {
- event.setFetchFromHDFS(putallOp.getEvent().isFetchFromHDFS());
event.setPutDML(putallOp.getEvent().isPutDML());
if (tagHolder != null) {
@@ -12921,22 +12827,6 @@ public class LocalRegion extends AbstractRegion
public Integer getCountNotFoundInLocal() {
return countNotFoundInLocal.get();
}
- /// End of Variables and methods for test Hook for HDFS ///////
- public void forceHDFSCompaction(boolean isMajor, Integer maxWaitTime) {
- throw new UnsupportedOperationException(
- LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
- .toLocalizedString(getName()));
- }
-
- public void flushHDFSQueue(int maxWaitTime) {
- throw new UnsupportedOperationException(
- LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
- .toLocalizedString(getName()));
- }
-
- public long lastMajorHDFSCompaction() {
- throw new UnsupportedOperationException();
- }
public static void simulateClearForTests(boolean flag) {
simulateClearForTests = flag;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
index 5193a17..c26ff10 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
@@ -17,7 +17,6 @@
package com.gemstone.gemfire.internal.cache;
import java.util.Collection;
-import java.util.Iterator;
import java.util.Set;
import com.gemstone.gemfire.cache.EntryNotFoundException;
@@ -36,9 +35,16 @@ public class LocalRegionDataView implements InternalDataView {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
- public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
- boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadsFromHDFS, boolean retainResult) {
- return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadsFromHDFS, retainResult);
+ public Object getDeserializedValue(KeyInfo keyInfo,
+ LocalRegion localRegion,
+ boolean updateStats,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones,
+ boolean retainResult) {
+ return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones,
+ retainResult);
}
/* (non-Javadoc)
@@ -136,9 +142,17 @@ public class LocalRegionDataView implements InternalDataView {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
*/
- public Object findObject(KeyInfo keyInfo, LocalRegion r, boolean isCreate,
- boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
- return r.nonTxnFindObject(keyInfo, isCreate, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
+ public Object findObject(KeyInfo keyInfo,
+ LocalRegion r,
+ boolean isCreate,
+ boolean generateCallbacks,
+ Object value,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) {
+ return r.nonTxnFindObject(keyInfo, isCreate, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
}
/* (non-Javadoc)
@@ -180,7 +194,12 @@ public class LocalRegionDataView implements InternalDataView {
* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.BucketRegion, java.lang.Object, java.lang.Object)
*/
- public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
+ public Object getSerializedValue(LocalRegion localRegion,
+ KeyInfo key,
+ boolean doNotLockEntry,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) throws DataLocationException {
throw new IllegalStateException();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
index bb83383..4c1fa7f 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
@@ -461,26 +461,6 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp {
}
@Override
- public boolean isMarkedForEviction() {
- throw new UnsupportedOperationException(LocalizedStrings
- .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
- .toLocalizedString());
- }
- @Override
- public void setMarkedForEviction() {
- throw new UnsupportedOperationException(LocalizedStrings
- .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
- .toLocalizedString());
- }
-
- @Override
- public void clearMarkedForEviction() {
- throw new UnsupportedOperationException(LocalizedStrings
- .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
- .toLocalizedString());
- }
-
- @Override
public boolean isValueNull() {
return (null == getValueAsToken());
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
index fe8813e..4728594 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
@@ -7384,19 +7384,6 @@ public final class Oplog implements CompactableOplog, Flushable {
// TODO Auto-generated method stub
}
@Override
- public boolean isMarkedForEviction() {
- // TODO Auto-generated method stub
- return false;
- }
- @Override
- public void setMarkedForEviction() {
- // TODO Auto-generated method stub
- }
- @Override
- public void clearMarkedForEviction() {
- // TODO Auto-generated method stub
- }
- @Override
public boolean isInvalid() {
// TODO Auto-generated method stub
return false;