You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2016/05/03 23:45:17 UTC
[03/60] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS
related code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java
deleted file mode 100644
index 52470d0..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-
-/**
- * Delegates object comparisons to one or more embedded comparators.
- *
- */
-public interface DelegatingSerializedComparator extends SerializedComparator {
- /**
- * Injects the embedded comparators.
- * @param comparators the comparators for delegation
- */
- void setComparators(SerializedComparator[] comparators);
-
- /**
- * Returns the embedded comparators.
- * @return the comparators
- */
- SerializedComparator[] getComparators();
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java
deleted file mode 100644
index fdf3852..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import static com.gemstone.gemfire.distributed.internal.DistributionStats.getStatTime;
-
-import com.gemstone.gemfire.StatisticDescriptor;
-import com.gemstone.gemfire.Statistics;
-import com.gemstone.gemfire.StatisticsFactory;
-import com.gemstone.gemfire.StatisticsType;
-import com.gemstone.gemfire.StatisticsTypeFactory;
-import com.gemstone.gemfire.internal.DummyStatisticsFactory;
-import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
-
-public class HFileStoreStatistics {
- private final Statistics stats;
-
- private final CacheOperation blockCache;
-
- public HFileStoreStatistics(String typeName, String name) {
- this(new DummyStatisticsFactory(), typeName, name);
- }
-
- public HFileStoreStatistics(StatisticsFactory factory, String typeName, String name) {
- StatisticsTypeFactory tf = StatisticsTypeFactoryImpl.singleton();
-
- StatisticDescriptor bcMisses = tf.createLongCounter("blockCacheMisses", "The total number of block cache misses", "misses");
- StatisticDescriptor bcHits = tf.createLongCounter("blockCacheHits", "The total number of block cache hits", "hits");
- StatisticDescriptor bcCached = tf.createLongGauge("blocksCached", "The current number of cached blocks", "blocks");
- StatisticDescriptor bcBytesCached = tf.createLongGauge("blockBytesCached", "The current number of bytes cached", "bytes");
- StatisticDescriptor bcBytesEvicted = tf.createLongCounter("blockBytesEvicted", "The total number of bytes cached", "bytes");
-
-
- StatisticsType type = tf.createType(typeName,
- "Statistics about structured I/O operations for a region", new StatisticDescriptor[] {
- bcMisses, bcHits, bcCached, bcBytesCached, bcBytesEvicted
- });
-
- blockCache = new CacheOperation(bcMisses.getId(), bcHits.getId(), bcCached.getId(), bcBytesCached.getId(), bcBytesEvicted.getId());
-
-
- stats = factory.createAtomicStatistics(type, name);
- }
-
- public void close() {
- stats.close();
- }
-
- public Statistics getStats() {
- return stats;
- }
-
- public CacheOperation getBlockCache() {
- return blockCache;
- }
-
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("blockCache = {").append(blockCache).append("}\n");
-
- return sb.toString();
- }
-
- public class TimedOperation {
- protected final int countId;
- protected final int inProgressId;
- protected final int timeId;
- private final int errorsId;
-
- public TimedOperation(int count, int inProgress, int time, int errors) {
- this.countId = count;
- this.inProgressId = inProgress;
- this.timeId = time;
- this.errorsId = errors;
- }
-
- public long begin() {
- stats.incLong(inProgressId, 1);
- return getStatTime();
- }
-
- public long end(long start) {
- stats.incLong(inProgressId, -1);
- stats.incLong(countId, 1);
- stats.incLong(timeId, getStatTime() - start);
- return getStatTime();
- }
-
- public void error(long start) {
- end(start);
- stats.incLong(errorsId, 1);
- }
-
- public long getCount() {
- return stats.getLong(countId);
- }
-
- public long getInProgress() {
- return stats.getLong(inProgressId);
- }
-
- public long getTime() {
- return stats.getLong(timeId);
- }
-
- public long getErrors() {
- return stats.getLong(errorsId);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("count=").append(getCount());
- sb.append(";inProgress=").append(getInProgress());
- sb.append(";errors=").append(getErrors());
- sb.append(";time=").append(getTime());
-
- return sb.toString();
- }
- }
-
- public class CacheOperation {
- private final int missesId;
- private final int hitsId;
- private final int cachedId;
- private final int bytesCachedId;
- private final int bytesEvictedId;
-
- public CacheOperation(int missesId, int hitsId, int cachedId,
- int bytesCachedId, int bytesEvictedId) {
- this.missesId = missesId;
- this.hitsId = hitsId;
- this.cachedId = cachedId;
- this.bytesCachedId = bytesCachedId;
- this.bytesEvictedId = bytesEvictedId;
- }
-
- public void store(long bytes) {
- stats.incLong(cachedId, 1);
- stats.incLong(bytesCachedId, bytes);
- }
-
- public void evict(long bytes) {
- stats.incLong(cachedId, -1);
- stats.incLong(bytesCachedId, -bytes);
- stats.incLong(bytesEvictedId, bytes);
- }
-
- public void hit() {
- stats.incLong(hitsId, 1);
- }
-
- public void miss() {
- stats.incLong(missesId, 1);
- }
-
- public long getMisses() {
- return stats.getLong(missesId);
- }
-
- public long getHits() {
- return stats.getLong(hitsId);
- }
-
- public long getCached() {
- return stats.getLong(cachedId);
- }
-
- public long getBytesCached() {
- return stats.getLong(bytesCachedId);
- }
-
- public long getBytesEvicted() {
- return stats.getLong(bytesEvictedId);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("misses=").append(getMisses());
- sb.append(";hits=").append(getHits());
- sb.append(";cached=").append(getCached());
- sb.append(";bytesCached=").append(getBytesCached());
- sb.append(";bytesEvicted=").append(getBytesEvicted());
-
- return sb.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java
deleted file mode 100644
index df7e1ac..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.util.Iterator;
-
-/**
- * Provides an {@link Iterator} view over a collection of keys and values. The
- * implementor must provide access to the current key/value as well as a means
- * to move to the next pair.
- *
- *
- * @param <K> the key type
- * @param <V> the value type
- */
-public interface KeyValueIterator<K, V> extends Iterator<K> {
- /**
- * Returns the key at the current position.
- * @return the key
- */
- public K key();
-
- /**
- * Returns the value at the current position.
- * @return the value
- */
- public abstract V value();
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java
deleted file mode 100644
index 35baafb..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import static com.gemstone.gemfire.distributed.internal.DistributionStats.getStatTime;
-
-import com.gemstone.gemfire.StatisticDescriptor;
-import com.gemstone.gemfire.Statistics;
-import com.gemstone.gemfire.StatisticsFactory;
-import com.gemstone.gemfire.StatisticsType;
-import com.gemstone.gemfire.StatisticsTypeFactory;
-import com.gemstone.gemfire.internal.DummyStatisticsFactory;
-import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
-
-public class SortedOplogStatistics {
- private final Statistics stats;
-
- private final IOOperation read;
- private final ScanOperation scan;
- private final IOOperation write;
- private final IOOperation put;
- private final IOOperation flush;
- private final IOOperation minorCompaction;
- private final IOOperation majorCompaction;
- private final BloomOperation bloom;
- private final TimedOperation clear;
- private final TimedOperation destroy;
-
- private final IOOperation blockRead;
- private final CacheOperation blockCache;
-
- private final int activeFilesId;
- private final int inactiveFilesId;
- private final int activeReadersId;
-
- private final int storeUsageBytesId;
-
- public SortedOplogStatistics(String typeName, String name) {
- this(new DummyStatisticsFactory(), typeName, name);
- }
-
- public SortedOplogStatistics(StatisticsFactory factory, String typeName, String name) {
- StatisticsTypeFactory tf = StatisticsTypeFactoryImpl.singleton();
-
- StatisticDescriptor readCount = tf.createLongCounter("reads", "The total number of read operations", "ops");
- StatisticDescriptor readInProgress = tf.createLongGauge("readsInProgress", "The number of read operations in progress", "ops");
- StatisticDescriptor readTime = tf.createLongCounter("readTime", "The total time spent reading from disk", "nanoseconds");
- StatisticDescriptor readBytes = tf.createLongCounter("readBytes", "The total number of bytes read from disk", "bytes");
- StatisticDescriptor readErrors = tf.createLongCounter("readErrors", "The total number of read errors", "errors");
-
- StatisticDescriptor scanCount = tf.createLongCounter("scans", "The total number of scan operations", "ops");
- StatisticDescriptor scanInProgress = tf.createLongGauge("scansInProgress", "The number of scan operations in progress", "ops");
- StatisticDescriptor scanTime = tf.createLongCounter("scanTime", "The total time scanner was operational", "nanoseconds");
- StatisticDescriptor scanBytes = tf.createLongCounter("scanBytes", "The total number of bytes scanned from disk", "bytes");
- StatisticDescriptor scanErrors = tf.createLongCounter("scanErrors", "The total number of scan errors", "errors");
- StatisticDescriptor scanIterations = tf.createLongCounter("scanIterations", "The total number of scan iterations", "ops");
- StatisticDescriptor scanIterationTime = tf.createLongCounter("scanIterationTime", "The total time spent scanning from persistence layer", "nanoseconds");
-
- StatisticDescriptor writeCount = tf.createLongCounter("writes", "The total number of write operations", "ops");
- StatisticDescriptor writeInProgress = tf.createLongGauge("writesInProgress", "The number of write operations in progress", "ops");
- StatisticDescriptor writeTime = tf.createLongCounter("writeTime", "The total time spent writing to disk", "nanoseconds");
- StatisticDescriptor writeBytes = tf.createLongCounter("writeBytes", "The total number of bytes written to disk", "bytes");
- StatisticDescriptor writeErrors = tf.createLongCounter("writeErrors", "The total number of write errors", "errors");
-
- StatisticDescriptor putCount = tf.createLongCounter("puts", "The total number of put operations", "ops");
- StatisticDescriptor putInProgress = tf.createLongGauge("putsInProgress", "The number of put operations in progress", "ops");
- StatisticDescriptor putTime = tf.createLongCounter("putTime", "The total time spent in put calls", "nanoseconds");
- StatisticDescriptor putBytes = tf.createLongCounter("putBytes", "The total number of bytes put", "bytes");
- StatisticDescriptor putErrors = tf.createLongCounter("putErrors", "The total number of put errors", "errors");
-
- StatisticDescriptor flushCount = tf.createLongCounter("flushes", "The total number of flush operations", "ops");
- StatisticDescriptor flushInProgress = tf.createLongGauge("flushesInProgress", "The number of flush operations in progress", "ops");
- StatisticDescriptor flushTime = tf.createLongCounter("flushTime", "The total time spent flushing to disk", "nanoseconds");
- StatisticDescriptor flushBytes = tf.createLongCounter("flushBytes", "The total number of bytes flushed to disk", "bytes");
- StatisticDescriptor flushErrors = tf.createLongCounter("flushErrors", "The total number of flush errors", "errors");
-
- StatisticDescriptor minorCompactionCount = tf.createLongCounter("minorCompactions", "The total number of minor compaction operations", "ops");
- StatisticDescriptor minorCompactionInProgress = tf.createLongGauge("minorCompactionsInProgress", "The number of minor compaction operations in progress", "ops");
- StatisticDescriptor minorCompactionTime = tf.createLongCounter("minorCompactionTime", "The total time spent in minor compactions", "nanoseconds");
- StatisticDescriptor minorCompactionBytes = tf.createLongCounter("minorCompactionBytes", "The total number of bytes collected during minor compactions", "bytes");
- StatisticDescriptor minorCompactionErrors = tf.createLongCounter("minorCompactionErrors", "The total number of minor compaction errors", "errors");
-
- StatisticDescriptor majorCompactionCount = tf.createLongCounter("majorCompactions", "The total number of major compaction operations", "ops");
- StatisticDescriptor majorCompactionInProgress = tf.createLongGauge("majorCompactionsInProgress", "The number of major compaction operations in progress", "ops");
- StatisticDescriptor majorCompactionTime = tf.createLongCounter("majorCompactionTime", "The total time spent in major compactions", "nanoseconds");
- StatisticDescriptor majorCompactionBytes = tf.createLongCounter("majorCompactionBytes", "The total number of bytes collected during major compactions", "bytes");
- StatisticDescriptor majorCompactionErrors = tf.createLongCounter("majorCompactionErrors", "The total number of major compaction errors", "errors");
-
- StatisticDescriptor bloomCount = tf.createLongCounter("bloomFilterCheck", "The total number of Bloom Filter checks", "ops");
- StatisticDescriptor bloomInProgress = tf.createLongGauge("bloomFilterChecksInProgress", "The number of Bloom Filter checks in progress", "ops");
- StatisticDescriptor bloomTime = tf.createLongCounter("bloomFilterCheckTime", "The total time spent checking the Bloom Filter", "nanoseconds");
- StatisticDescriptor bloomErrors = tf.createLongCounter("bloomFilterErrors", "The total number of Bloom Filter errors", "errors");
- StatisticDescriptor bloomFalsePositive = tf.createLongCounter("bloomFilterFalsePositives", "The total number of Bloom Filter false positives", "false positives");
-
- StatisticDescriptor clearCount = tf.createLongCounter("clears", "The total number of clear operations", "ops");
- StatisticDescriptor clearInProgress = tf.createLongGauge("clearsInProgress", "The number of clear operations in progress", "ops");
- StatisticDescriptor clearTime = tf.createLongCounter("clearTime", "The total time spent in clear operations", "nanoseconds");
- StatisticDescriptor clearErrors = tf.createLongGauge("clearErrors", "The total number of clear errors", "errors");
-
- StatisticDescriptor destroyCount = tf.createLongCounter("destroys", "The total number of destroy operations", "ops");
- StatisticDescriptor destroyInProgress = tf.createLongGauge("destroysInProgress", "The number of destroy operations in progress", "ops");
- StatisticDescriptor destroyTime = tf.createLongCounter("destroyTime", "The total time spent in destroy operations", "nanoseconds");
- StatisticDescriptor destroyErrors = tf.createLongGauge("destroyErrors", "The total number of destroy errors", "errors");
-
- StatisticDescriptor brCount = tf.createLongCounter("blockReads", "The total number of block read operations", "ops");
- StatisticDescriptor brInProgress = tf.createLongGauge("blockReadsInProgress", "The number of block read operations in progress", "ops");
- StatisticDescriptor brTime = tf.createLongCounter("blockReadTime", "The total time spent reading blocks from disk", "nanoseconds");
- StatisticDescriptor brBytes = tf.createLongCounter("blockReadBytes", "The total number of block bytes read from disk", "bytes");
- StatisticDescriptor brErrors = tf.createLongCounter("blockReadErrors", "The total number of block read errors", "errors");
-
- StatisticDescriptor bcMisses = tf.createLongCounter("blockCacheMisses", "The total number of block cache misses", "misses");
- StatisticDescriptor bcHits = tf.createLongCounter("blockCacheHits", "The total number of block cache hits", "hits");
- StatisticDescriptor bcCached = tf.createLongGauge("blocksCached", "The current number of cached blocks", "blocks");
- StatisticDescriptor bcBytesCached = tf.createLongGauge("blockBytesCached", "The current number of bytes cached", "bytes");
- StatisticDescriptor bcBytesEvicted = tf.createLongCounter("blockBytesEvicted", "The total number of bytes cached", "bytes");
-
- StatisticDescriptor activeFileCount = tf.createLongGauge("activeFileCount", "The total number of active files", "files");
- StatisticDescriptor inactiveFileCount = tf.createLongGauge("inactiveFileCount", "The total number of inactive files", "files");
- StatisticDescriptor activeReaderCount = tf.createLongGauge("activeReaderCount", "The total number of active file readers", "files");
-
- StatisticDescriptor storeUsageBytes = tf.createLongGauge("storeUsageBytes", "The total volume occupied on persistent store", "bytes");
-
- StatisticsType type = tf.createType(typeName,
- "Statistics about structured I/O operations for a region", new StatisticDescriptor[] {
- readCount, readInProgress, readTime, readBytes, readErrors,
- scanCount, scanInProgress, scanTime, scanBytes, scanErrors, scanIterations, scanIterationTime,
- writeCount, writeInProgress, writeTime, writeBytes, writeErrors,
- putCount, putInProgress, putTime, putBytes, putErrors,
- flushCount, flushInProgress, flushTime, flushBytes, flushErrors,
- minorCompactionCount, minorCompactionInProgress, minorCompactionTime, minorCompactionBytes, minorCompactionErrors,
- majorCompactionCount, majorCompactionInProgress, majorCompactionTime, majorCompactionBytes, majorCompactionErrors,
- bloomCount, bloomInProgress, bloomTime, bloomErrors, bloomFalsePositive,
- clearCount, clearInProgress, clearTime, clearErrors,
- destroyCount, destroyInProgress, destroyTime, destroyErrors,
- brCount, brInProgress, brTime, brBytes, brErrors,
- bcMisses, bcHits, bcCached, bcBytesCached, bcBytesEvicted,
- activeFileCount, inactiveFileCount, activeReaderCount, storeUsageBytes
- });
-
- read = new IOOperation(readCount.getId(), readInProgress.getId(), readTime.getId(), readBytes.getId(), readErrors.getId());
- scan = new ScanOperation(scanCount.getId(), scanInProgress.getId(), scanTime.getId(), scanBytes.getId(), scanErrors.getId(), scanIterations.getId(), scanIterationTime.getId());
- write = new IOOperation(writeCount.getId(), writeInProgress.getId(), writeTime.getId(), writeBytes.getId(), writeErrors.getId());
- put = new IOOperation(putCount.getId(), putInProgress.getId(), putTime.getId(), putBytes.getId(), putErrors.getId());
- flush = new IOOperation(flushCount.getId(), flushInProgress.getId(), flushTime.getId(), flushBytes.getId(), flushErrors.getId());
- minorCompaction = new IOOperation(minorCompactionCount.getId(), minorCompactionInProgress.getId(), minorCompactionTime.getId(), minorCompactionBytes.getId(), minorCompactionErrors.getId());
- majorCompaction = new IOOperation(majorCompactionCount.getId(), majorCompactionInProgress.getId(), majorCompactionTime.getId(), majorCompactionBytes.getId(), majorCompactionErrors.getId());
- bloom = new BloomOperation(bloomCount.getId(), bloomInProgress.getId(), bloomTime.getId(), bloomErrors.getId(), bloomFalsePositive.getId());
- clear = new TimedOperation(clearCount.getId(), clearInProgress.getId(), clearTime.getId(), clearErrors.getId());
- destroy = new TimedOperation(destroyCount.getId(), destroyInProgress.getId(), destroyTime.getId(), destroyErrors.getId());
-
- blockRead = new IOOperation(brCount.getId(), brInProgress.getId(), brTime.getId(), brBytes.getId(), brErrors.getId());
- blockCache = new CacheOperation(bcMisses.getId(), bcHits.getId(), bcCached.getId(), bcBytesCached.getId(), bcBytesEvicted.getId());
-
- activeFilesId = activeFileCount.getId();
- inactiveFilesId = inactiveFileCount.getId();
- activeReadersId = activeReaderCount.getId();
- storeUsageBytesId = storeUsageBytes.getId();
-
- stats = factory.createAtomicStatistics(type, name);
- }
-
- public void close() {
- stats.close();
- }
-
- public Statistics getStats() {
- return stats;
- }
-
- public IOOperation getRead() {
- return read;
- }
-
- public ScanOperation getScan() {
- return scan;
- }
-
- public IOOperation getWrite() {
- return write;
- }
-
- public IOOperation getPut() {
- return put;
- }
-
- public IOOperation getFlush() {
- return flush;
- }
-
- public IOOperation getMinorCompaction() {
- return minorCompaction;
- }
-
- public IOOperation getMajorCompaction() {
- return majorCompaction;
- }
-
- public BloomOperation getBloom() {
- return bloom;
- }
-
- public TimedOperation getClear() {
- return clear;
- }
-
- public TimedOperation getDestroy() {
- return destroy;
- }
-
- public IOOperation getBlockRead() {
- return blockRead;
- }
-
- public CacheOperation getBlockCache() {
- return blockCache;
- }
-
- public long getActiveFileCount() {
- return stats.getLong(activeFilesId);
- }
-
- public long getInactiveFileCount() {
- return stats.getLong(inactiveFilesId);
- }
-
- public long getActiveReaderCount() {
- return stats.getLong(activeReadersId);
- }
-
- public void incActiveFiles(int amt) {
- stats.incLong(activeFilesId, amt);
- assert stats.getLong(activeFilesId) >= 0;
- }
-
- public void incInactiveFiles(int amt) {
- stats.incLong(inactiveFilesId, amt);
- assert stats.getLong(inactiveFilesId) >= 0;
- }
-
- public void incActiveReaders(int amt) {
- stats.incLong(activeReadersId, amt);
- assert stats.getLong(activeReadersId) >= 0;
- }
-
- public long getStoreUsageBytes() {
- return stats.getLong(storeUsageBytesId);
- }
-
- public void incStoreUsageBytes(long amt) {
- stats.incLong(storeUsageBytesId, amt);
- assert stats.getLong(storeUsageBytesId) >= 0;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("read = {").append(read).append("}\n");
- sb.append("scan = {").append(scan).append("}\n");
- sb.append("write = {").append(write).append("}\n");
- sb.append("put = {").append(put).append("}\n");
- sb.append("flush = {").append(flush).append("}\n");
- sb.append("minorCompaction = {").append(minorCompaction).append("}\n");
- sb.append("majorCompaction = {").append(majorCompaction).append("}\n");
- sb.append("bloom = {").append(bloom).append("}\n");
- sb.append("clear = {").append(clear).append("}\n");
- sb.append("destroy = {").append(destroy).append("}\n");
- sb.append("blockRead = {").append(blockRead).append("}\n");
- sb.append("blockCache = {").append(blockCache).append("}\n");
- sb.append("activeFiles = ").append(stats.getLong(activeFilesId)).append("\n");
- sb.append("inactiveFiles = ").append(stats.getLong(inactiveFilesId)).append("\n");
- sb.append("activeReaders = ").append(stats.getLong(activeReadersId)).append("\n");
- sb.append("storeUsageBytes = ").append(stats.getLong(storeUsageBytesId)).append("\n");
-
- return sb.toString();
- }
-
- public class TimedOperation {
- protected final int countId;
- protected final int inProgressId;
- protected final int timeId;
- private final int errorsId;
-
- public TimedOperation(int count, int inProgress, int time, int errors) {
- this.countId = count;
- this.inProgressId = inProgress;
- this.timeId = time;
- this.errorsId = errors;
- }
-
- public long begin() {
- stats.incLong(inProgressId, 1);
- return getStatTime();
- }
-
- public long end(long start) {
- stats.incLong(inProgressId, -1);
- stats.incLong(countId, 1);
- stats.incLong(timeId, getStatTime() - start);
- return getStatTime();
- }
-
- public void error(long start) {
- end(start);
- stats.incLong(errorsId, 1);
- }
-
- public long getCount() {
- return stats.getLong(countId);
- }
-
- public long getInProgress() {
- return stats.getLong(inProgressId);
- }
-
- public long getTime() {
- return stats.getLong(timeId);
- }
-
- public long getErrors() {
- return stats.getLong(errorsId);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("count=").append(getCount());
- sb.append(";inProgress=").append(getInProgress());
- sb.append(";errors=").append(getErrors());
- sb.append(";time=").append(getTime());
-
- return sb.toString();
- }
- }
-
- public class IOOperation extends TimedOperation {
- protected final int bytesId;
-
- public IOOperation(int count, int inProgress, int time, int bytes, int errors) {
- super(count, inProgress, time, errors);
- this.bytesId = bytes;
- }
-
- public long end(long bytes, long start) {
- stats.incLong(bytesId, bytes);
- return super.end(start);
- }
-
- public long getBytes() {
- return stats.getLong(bytesId);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder(super.toString());
- sb.append(";bytes=").append(getBytes());
-
- return sb.toString();
- }
- }
-
- public class ScanOperation extends IOOperation {
- private final int iterationsId;
- private final int iterationTimeId;
-
- public ScanOperation(int count, int inProgress, int time, int bytes, int errors, int iterCount, int iterTime) {
- super(count, inProgress, time, bytes, errors);
- iterationsId = iterCount;
- iterationTimeId = iterTime;
- }
-
- public long beginIteration() {
- return getStatTime();
- }
-
- public void endIteration(long bytes, long start){
- stats.incLong(iterationsId, 1);
- stats.incLong(bytesId, bytes);
- stats.incLong(iterationTimeId, getStatTime() - start);
- }
-
- public long getIterations() {
- return stats.getLong(iterationsId);
- }
-
- public long getIterationTime() {
- return stats.getLong(iterationTimeId);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder(super.toString());
- sb.append(";iterations=").append(getIterations());
- sb.append(";iterationTime=").append(getIterationTime());
-
- return sb.toString();
- }
- }
-
- public class BloomOperation extends TimedOperation {
- private final int falsePositiveId;
-
- public BloomOperation(int count, int inProgress, int time, int errors, int falsePositive) {
- super(count, inProgress, time, errors);
- this.falsePositiveId = falsePositive;
- }
-
- public void falsePositive() {
- stats.incLong(falsePositiveId, 1);
- }
-
- public long getFalsePositives() {
- return stats.getLong(falsePositiveId);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder(super.toString());
- sb.append(";falsePositives=").append(getFalsePositives());
-
- return sb.toString();
- }
- }
-
- public class CacheOperation {
- private final int missesId;
- private final int hitsId;
- private final int cachedId;
- private final int bytesCachedId;
- private final int bytesEvictedId;
-
- public CacheOperation(int missesId, int hitsId, int cachedId,
- int bytesCachedId, int bytesEvictedId) {
- this.missesId = missesId;
- this.hitsId = hitsId;
- this.cachedId = cachedId;
- this.bytesCachedId = bytesCachedId;
- this.bytesEvictedId = bytesEvictedId;
- }
-
- public void store(long bytes) {
- stats.incLong(cachedId, 1);
- stats.incLong(bytesCachedId, bytes);
- }
-
- public void evict(long bytes) {
- stats.incLong(cachedId, -1);
- stats.incLong(bytesCachedId, -bytes);
- stats.incLong(bytesEvictedId, bytes);
- }
-
- public void hit() {
- stats.incLong(hitsId, 1);
- }
-
- public void miss() {
- stats.incLong(missesId, 1);
- }
-
- public long getMisses() {
- return stats.getLong(missesId);
- }
-
- public long getHits() {
- return stats.getLong(hitsId);
- }
-
- public long getCached() {
- return stats.getLong(cachedId);
- }
-
- public long getBytesCached() {
- return stats.getLong(bytesCachedId);
- }
-
- public long getBytesEvicted() {
- return stats.getLong(bytesEvictedId);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("misses=").append(getMisses());
- sb.append(";hits=").append(getHits());
- sb.append(";cached=").append(getCached());
- sb.append(";bytesCached=").append(getBytesCached());
- sb.append(";bytesEvicted=").append(getBytesEvicted());
-
- return sb.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java
deleted file mode 100644
index 1042e22..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-import org.apache.hadoop.io.RawComparator;
-
-/**
- * Defines a means to read sorted data including performing range scans.
- *
- * @param <V> type of value returned by the sorted reader
- *
- */
-public interface SortedReader<V> extends Closeable {
- /**
- * Defines the names of additional data that may be associated with a sorted
- * reader.
- */
- public enum Metadata {
- /** identifies the disk store associated with the soplog, optional */
- DISK_STORE,
-
- /** identifies the RVV data, optional */
- RVV;
-
- /**
- * Converts the metadata name to bytes.
- * @return the bytes
- */
- public byte[] bytes() {
- return ("gemfire." + name()).getBytes();
- }
- }
-
- /**
- * Filters data based on metadata values.
- */
- public interface MetadataFilter {
- /**
- * Returns the name this filter acts upon.
- * @return the name
- */
- Metadata getName();
-
- /**
- * Returns true if the metadata value passes the filter.
- * @param value the value to check; may be null if the metadata value does
- * not exist or has not been assigned yet
- * @return true if accepted
- */
- boolean accept(byte[] value);
- }
-
- /**
- * Allows comparisons between serialized objects.
- */
- public interface SerializedComparator extends RawComparator<byte[]> {
- }
-
- /**
- * Allows sorted iteration through a set of keys and values.
- */
- public interface SortedIterator<V> extends KeyValueIterator<ByteBuffer, V> {
- /**
- * Closes the iterator and frees any retained resources.
- */
- public abstract void close();
- }
-
- /**
- * Defines the statistics available on a sorted file.
- */
- public interface SortedStatistics {
- /**
- * Returns the number of keys in the file.
- * @return the key count
- */
- long keyCount();
-
- /**
- * Returns the first key in the file.
- * @return the first key
- */
- byte[] firstKey();
-
- /**
- * Returns the last key in the file.
- * @return the last key
- */
- byte[] lastKey();
-
- /**
- * Returns the average key size in bytes.
- * @return the average key size
- */
- double avgKeySize();
-
- /**
- * Returns the average value size in bytes.
- * @return the average value size
- */
- double avgValueSize();
-
- /**
- * Frees any resources held by for statistics generation.
- */
- void close();
- }
-
- /**
- * Returns true if the bloom filter might contain the supplied key. The
- * nature of the bloom filter is such that false positives are allowed, but
- * false negatives cannot occur.
- *
- * @param key the key to test
- * @return true if the key might be present
- * @throws IOException read error
- */
- boolean mightContain(byte[] key) throws IOException;
-
- /**
- * Returns the value associated with the given key.
- *
- * @param key the key
- * @return the value, or null if the key is not found
- * @throws IOException read error
- */
- V read(byte[] key) throws IOException;
-
- /**
- * Iterates from the first key in the file to the requested key.
- * @param to the ending key
- * @param inclusive true if the ending key is included in the iteration
- * @return the sorted iterator
- * @throws IOException scan error
- */
- SortedIterator<V> head(byte[] to, boolean inclusive) throws IOException;
-
- /**
- * Iterates from the requested key to the last key in the file.
- * @param from the starting key
- * @param inclusive true if the starting key should be included in the iteration
- * @return the sorted iterator
- * @throws IOException scan error
- */
- SortedIterator<V> tail(byte[] from, boolean inclusive) throws IOException;
-
- /**
- * Iterators over the entire contents of the sorted file.
- *
- * @return the sorted iterator
- * @throws IOException scan error
- */
- SortedIterator<V> scan() throws IOException;
-
- /**
- * Scans the available keys and allows iteration over the interval [from, to)
- * where the starting key is included and the ending key is excluded from
- * the results.
- *
- * @param from the start key
- * @param to the end key
- * @return the sorted iterator
- * @throws IOException scan error
- */
- SortedIterator<V> scan(byte[] from, byte[] to) throws IOException;
-
- /**
- * Scans the keys and returns an iterator over the interval [equalTo, equalTo].
- *
- * @param equalTo the key to match
- * @return the sorted iterator
- * @throws IOException scan error
- */
- SortedIterator<V> scan(byte[] equalTo) throws IOException;
-
- /**
- * Scans the keys and allows iteration between the given keys.
- *
- * @param from the start key
- * @param fromInclusive true if the start key is included in the scan
- * @param to the end key
- * @param toInclusive true if the end key is included in the scan
- * @return the sorted iterator
- * @throws IOException scan error
- */
- SortedIterator<V> scan(byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive) throws IOException;
-
- /**
- * Scans the keys and allows iteration between the given keys after applying
- * the metdata filter and the order flag. These parameters override values
- * configured using <code>withAscending</code> or <code>withFilter</code>.
- *
- * @param from the start key
- * @param fromInclusive true if the start key is included in the scan
- * @param to the end key
- * @param toInclusive true if the end key is included in the scan
- * @param ascending true if ascending
- * @param filter filters data based on metadata values
- * @return the sorted iterator
- * @throws IOException scan error
- */
- SortedIterator<V> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive,
- boolean ascending,
- MetadataFilter filter) throws IOException;
-
- /**
- * Changes the iteration order of subsequent operations.
- *
- * @param ascending true if ascending order (default)
- * @return the reader
- */
- SortedReader<V> withAscending(boolean ascending);
-
- /**
- * Applies a metadata filter to subsequent operations.
- *
- * @param filter the filter to apply
- * @return the reader
- */
- SortedReader<V> withFilter(MetadataFilter filter);
-
- /**
- * Returns the comparator used for sorting keys.
- * @return the comparator
- */
- SerializedComparator getComparator();
-
- /**
- * Returns the statistics regarding the keys present in the sorted file.
- * @return the statistics
- * @throws IOException unable retrieve statistics
- */
- SortedStatistics getStatistics() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java
deleted file mode 100644
index 2934f07..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Tracks the usage of a reference.
- *
- *
- * @param <T> the reference type
- */
-public final class TrackedReference<T> {
- /** the referent */
- private final T ref;
-
- /** the number of uses */
- private final AtomicInteger uses;
-
- /** list of users using this reference. Mainly for debugging */
- final ConcurrentHashMap<String, AtomicInteger> users;
-
- /**
- * Decrements the use count of each reference.
- * @param refs the references to decrement
- */
- public static <T> void decrementAll(Iterable<TrackedReference<T>> refs) {
- for (TrackedReference<?> tr : refs) {
- tr.decrement();
- }
- }
-
- public TrackedReference(T ref) {
- this.ref = ref;
- uses = new AtomicInteger(0);
- users = new ConcurrentHashMap<String, AtomicInteger>();
- }
-
- /**
- * Returns the referent.
- * @return the referent
- */
- public final T get() {
- return ref;
- }
-
- /**
- * Returns the current count.
- * @return the current uses
- */
- public int uses() {
- return uses.get();
- }
-
- /**
- * Returns true if the reference is in use.
- * @return true if used
- */
- public boolean inUse() {
- return uses() > 0;
- }
-
- /**
- * Increments the use count and returns the reference.
- * @return the reference
- */
- public T getAndIncrement() {
- increment();
- return ref;
- }
-
- /**
- * Increments the use counter and returns the current count.
- * @return the current uses
- */
- public int increment() {
- return increment(null);
- }
-
- /**
- * Increments the use counter and returns the current count.
- * @return the current uses
- */
- public int increment(String user) {
- int val = uses.incrementAndGet();
- if (user != null) {
- AtomicInteger counter = users.get(user);
- if (counter == null) {
- counter = new AtomicInteger();
- users.putIfAbsent(user, counter);
- counter = users.get(user);
- }
- counter.incrementAndGet();
- }
- assert val >= 1;
-
- return val;
- }
-
- /**
- * Decrements the use counter and returns the current count.
- * @return the current uses
- */
- public int decrement() {
- return decrement(null);
- }
-
- /**
- * Decrements the use counter and returns the current count.
- * @return the current uses
- */
- public int decrement(String user) {
- int val = uses.decrementAndGet();
- assert val >= 0;
- if (user != null) {
- AtomicInteger counter = users.get(user);
- if (counter != null) {
- counter.decrementAndGet();
- }
- }
-
- return val;
- }
-
- @Override
- public String toString() {
- if (users != null) {
- StringBuffer sb = new StringBuffer();
- sb.append(ref.toString()).append(": ").append(uses());
- for (Entry<String, AtomicInteger> user : users.entrySet()) {
- sb.append(" ").append(user.getKey()).append(":").append(user.getValue().intValue());
- }
- return sb.toString();
- }
- return uses() + ": " + ref.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
index e6c07d9..ca7818a 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
@@ -1145,7 +1145,7 @@ public abstract class BaseCommand implements Command {
VersionTagHolder versionHolder = new VersionTagHolder();
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
// From Get70.getValueAndIsObject()
- Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true, false);
+ Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true);
VersionTag vt = versionHolder.getVersionTag();
updateValues(values, entryKey, data, vt);
@@ -1252,7 +1252,7 @@ public abstract class BaseCommand implements Command {
}
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
- data = region.get(key, null, true, true, true, id, versionHolder, true, false);
+ data = region.get(key, null, true, true, true, id, versionHolder, true);
versionTag = versionHolder.getVersionTag();
updateValues(values, key, data, versionTag);
@@ -1345,7 +1345,7 @@ public abstract class BaseCommand implements Command {
key = it.next();
versionHolder = new VersionTagHolder();
- Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true, false);
+ Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true);
updateValues(values, key, value, versionHolder.getVersionTag());
@@ -1548,7 +1548,7 @@ public abstract class BaseCommand implements Command {
ClientProxyMembershipID id = servConn == null ? null : servConn
.getProxyID();
data = region.get(key, null, true, true, true, id, versionHolder,
- true, false);
+ true);
versionTag = versionHolder.getVersionTag();
updateValues(values, key, data, versionTag);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
index 7898b3c..55047c7 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
@@ -24,7 +24,6 @@ import com.gemstone.gemfire.cache.client.internal.GetOp;
import com.gemstone.gemfire.cache.operations.GetOperationContext;
import com.gemstone.gemfire.cache.operations.internal.GetOperationContextImpl;
import com.gemstone.gemfire.distributed.internal.DistributionStats;
-import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.CachedDeserializable;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
@@ -305,7 +304,7 @@ public class Get70 extends BaseCommand {
// } else {
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
VersionTagHolder versionHolder = new VersionTagHolder();
- data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, versionHolder, true, true /*allowReadFromHDFS*/);
+ data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, versionHolder, true);
// }
versionTag = versionHolder.getVersionTag();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
index 2a617a8..69d54a1 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
@@ -242,7 +242,7 @@ public class Request extends BaseCommand {
boolean isObject = true;
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
- Object data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, null, false, true/*allowReadFromHDFS*/);
+ Object data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, null, false);
// If the value in the VM is a CachedDeserializable,
// get its value. If it is Token.REMOVED, Token.DESTROYED,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
index e896649..90522b2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
@@ -67,8 +67,8 @@ public class ClientTXRegionStub implements TXRegionStub {
public Object findObject(KeyInfo keyInfo, boolean isCreate,
- boolean generateCallbacks, Object value, boolean preferCD,
- ClientProxyMembershipID requestingClient, EntryEventImpl event, boolean allowReadFromHDFS) {
+ boolean generateCallbacks, Object value, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl event) {
return proxy.get(keyInfo.getKey(), keyInfo.getCallbackArg(), event);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
index 7c7df53..1637c4a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
@@ -17,12 +17,10 @@
package com.gemstone.gemfire.internal.cache.tx;
import java.util.Collections;
-import java.util.Map;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.RemoteTransactionException;
import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
import com.gemstone.gemfire.cache.TransactionException;
@@ -32,7 +30,6 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation;
import com.gemstone.gemfire.internal.cache.DistributedRemoveAllOperation;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.KeyInfo;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionException;
@@ -54,7 +51,6 @@ import com.gemstone.gemfire.internal.cache.partitioned.RemoteSizeMessage;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantReadWriteLock;
public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
@@ -159,9 +155,13 @@ public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
}
- public Object findObject(KeyInfo keyInfo, boolean isCreate,
- boolean generateCallbacks, Object value, boolean preferCD,
- ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean allowReadFromHDFS) {
+ public Object findObject(KeyInfo keyInfo,
+ boolean isCreate,
+ boolean generateCallbacks,
+ Object value,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent) {
Object retVal = null;
final Object key = keyInfo.getKey();
final Object callbackArgument = keyInfo.getCallbackArg();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
index 6723646..01b1ed8 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
@@ -275,15 +275,15 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
public Object findObject(KeyInfo keyInfo, boolean isCreate,
- boolean generateCallbacks, Object value, boolean peferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean allowReadFromHDFS) {
+ boolean generateCallbacks, Object value, boolean peferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent) {
Object retVal = null;
final Object key = keyInfo.getKey();
final Object callbackArgument = keyInfo.getCallbackArg();
PartitionedRegion pr = (PartitionedRegion)region;
try {
- retVal = pr.getRemotely((InternalDistributedMember)state.getTarget(), keyInfo.getBucketId(), key, callbackArgument, peferCD, requestingClient, clientEvent, false, allowReadFromHDFS);
+ retVal = pr.getRemotely((InternalDistributedMember)state.getTarget(), keyInfo.getBucketId(), key, callbackArgument, peferCD, requestingClient, clientEvent, false);
} catch (TransactionException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e.getCause());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
index 482882f..f2859f1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
@@ -42,8 +42,8 @@ public interface TXRegionStub {
boolean containsValueForKey(KeyInfo keyInfo);
Object findObject(KeyInfo keyInfo, boolean isCreate,
- boolean generateCallbacks, Object value, boolean preferCD,
- ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean allowReadFromHDFS);
+ boolean generateCallbacks, Object value, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent);
Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstone);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index 94524bd..fe09d03 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -157,8 +157,6 @@ public abstract class AbstractGatewaySender implements GatewaySender,
protected boolean isBucketSorted;
- protected boolean isHDFSQueue;
-
protected boolean isMetaQueue;
private int parallelismForReplicatedRegion;
@@ -260,7 +258,6 @@ public abstract class AbstractGatewaySender implements GatewaySender,
this.maxMemoryPerDispatcherQueue = this.queueMemory / this.dispatcherThreads;
this.myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager().getDistributedSystemId();
this.serialNumber = DistributionAdvisor.createSerialNumber();
- this.isHDFSQueue = attrs.isHDFSQueue();
this.isMetaQueue = attrs.isMetaQueue();
if (!(this.cache instanceof CacheCreation)) {
this.stopper = new Stopper(cache.getCancelCriterion());
@@ -269,8 +266,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
this.statistics = new GatewaySenderStats(cache.getDistributedSystem(),
id);
}
- if (!attrs.isHDFSQueue())
- initializeEventIdIndex();
+ initializeEventIdIndex();
}
this.isBucketSorted = attrs.isBucketSorted();
}
@@ -318,12 +314,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
cache.getDistributedSystem(), AsyncEventQueueImpl
.getAsyncEventQueueIdFromSenderId(id));
}
- if (!attrs.isHDFSQueue())
- initializeEventIdIndex();
+ initializeEventIdIndex();
}
this.isBucketSorted = attrs.isBucketSorted();
- this.isHDFSQueue = attrs.isHDFSQueue();
-
+
}
public GatewaySenderAdvisor getSenderAdvisor() {
@@ -482,10 +476,6 @@ public abstract class AbstractGatewaySender implements GatewaySender,
return this.isBucketSorted;
}
- public boolean getIsHDFSQueue() {
- return this.isHDFSQueue;
- }
-
public boolean getIsMetaQueue() {
return this.isMetaQueue;
}
@@ -863,12 +853,6 @@ public abstract class AbstractGatewaySender implements GatewaySender,
return;
}
- if (getIsHDFSQueue() && event.getOperation().isEviction()) {
- if (logger.isDebugEnabled())
- logger.debug("Eviction event not queued: " + event);
- stats.incEventsNotQueued();
- return;
- }
// this filter is defined by Asif which exist in old wan too. new wan has
// other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
// not cinsidering this filter
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
index 025616d..1cef940 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
@@ -30,7 +30,6 @@ import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
public class GatewaySenderAttributes {
public static final boolean DEFAULT_IS_BUCKETSORTED = true;
- public static final boolean DEFAULT_IS_HDFSQUEUE = false;
public static final boolean DEFAULT_IS_META_QUEUE = false;
@@ -82,7 +81,6 @@ public class GatewaySenderAttributes {
public boolean isBucketSorted = GatewaySenderAttributes.DEFAULT_IS_BUCKETSORTED;
- public boolean isHDFSQueue = GatewaySenderAttributes.DEFAULT_IS_HDFSQUEUE;
public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE;
public int getSocketBufferSize() {
@@ -191,9 +189,6 @@ public class GatewaySenderAttributes {
public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() {
return this.eventSubstitutionFilter;
}
- public boolean isHDFSQueue() {
- return this.isHDFSQueue;
- }
public boolean isMetaQueue() {
return this.isMetaQueue;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index b63c7cb..07a3be5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -36,9 +36,6 @@ import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index 8524ccf..f995ba4 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -22,8 +22,6 @@ package com.gemstone.gemfire.internal.cache.wan.parallel;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
@@ -188,11 +186,6 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
getPGSProcessor( bucketId).notifyEventProcessorIfRequired(bucketId);
}
- public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
- int bucketId) throws ForceReattemptException {
- return getPGSProcessor(bucketId).getBucketRegionQueue(region, bucketId);
- }
-
public void clear(PartitionedRegion pr, int bucketId) {
getPGSProcessor(bucketId).clear(pr, bucketId);
}
@@ -207,11 +200,6 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
getPGSProcessor(bucketId).conflateEvent(conflatableObject, bucketId, tailKey);
}
- public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey,
- int bucketId) throws ForceReattemptException {
- return getPGSProcessor(bucketId).get(region, regionKey, bucketId);
- }
-
public void addShadowPartitionedRegionForUserRR(DistributedRegion userRegion) {
for(int i =0; i< processors.length; i++){
processors[i].addShadowPartitionedRegionForUserRR(userRegion);;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 417ba13..11502af 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -28,9 +28,6 @@ import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
@@ -104,10 +101,7 @@ public class ParallelGatewaySenderEventProcessor extends
}
ParallelGatewaySenderQueue queue;
- if (sender.getIsHDFSQueue())
- queue = new HDFSParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
- else
- queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
+ queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
queue.start();
this.queue = queue;
@@ -145,12 +139,8 @@ public class ParallelGatewaySenderEventProcessor extends
// while merging 42004, kept substituteValue as it is(it is barry's
// change 42466). bucketID is merged with eventID.getBucketID
- if (!sender.getIsHDFSQueue())
gatewayQueueEvent = new GatewaySenderEventImpl(operation, event,
substituteValue, true, eventID.getBucketID());
- else
- gatewayQueueEvent = new HDFSGatewayEventImpl(operation,
- event, substituteValue, true, eventID.getBucketID());
if (getSender().beforeEnqueue(gatewayQueueEvent)) {
long start = getSender().getStatistics().startTime();
@@ -208,16 +198,6 @@ public class ParallelGatewaySenderEventProcessor extends
((ParallelGatewaySenderQueue)this.queue).conflateEvent(conflatableObject, bucketId, tailKey);
}
- public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey,
- int bucketId) throws ForceReattemptException {
- return ((HDFSParallelGatewaySenderQueue)this.queue).get(region, regionKey, bucketId);
- }
-
- public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
- int bucketId) throws ForceReattemptException {
- return ((HDFSParallelGatewaySenderQueue)this.queue).getBucketRegionQueue(region, bucketId);
- }
-
public void addShadowPartitionedRegionForUserPR(PartitionedRegion pr) {
// TODO Auto-generated method stub
((ParallelGatewaySenderQueue)this.queue).addShadowPartitionedRegionForUserPR(pr);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index b0b1a32..46ff263 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -492,7 +492,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
if (this.userRegionNameToshadowPRMap.containsKey(regionName))
return;
- if(!isUsedForHDFS() && userPR.getDataPolicy().withPersistence() && !sender.isPersistenceEnabled()){
+ if(userPR.getDataPolicy().withPersistence() && !sender.isPersistenceEnabled()){
throw new GatewaySenderException(
LocalizedStrings.ParallelGatewaySenderQueue_NON_PERSISTENT_GATEWAY_SENDER_0_CAN_NOT_BE_ATTACHED_TO_PERSISTENT_REGION_1
.toLocalizedString(new Object[] { this.sender.getId(),
@@ -552,7 +552,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
ParallelGatewaySenderQueueMetaRegion meta = metaRegionFactory.newMetataRegion(cache,
- prQName, ra, sender, isUsedForHDFS());
+ prQName, ra, sender);
try {
prQ = (PartitionedRegion)cache
@@ -630,10 +630,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
bucketRegion.clear();
}
}
- protected boolean isUsedForHDFS()
- {
- return false;
- }
protected void afterRegionAdd (PartitionedRegion userPR) {
}
@@ -1857,18 +1853,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
public ParallelGatewaySenderQueueMetaRegion(String regionName,
RegionAttributes attrs, LocalRegion parentRegion,
GemFireCacheImpl cache, AbstractGatewaySender pgSender) {
- this( regionName, attrs, parentRegion, cache, pgSender, false);
- }
- public ParallelGatewaySenderQueueMetaRegion(String regionName,
- RegionAttributes attrs, LocalRegion parentRegion,
- GemFireCacheImpl cache, AbstractGatewaySender pgSender, boolean isUsedForHDFS) {
super(regionName, attrs, parentRegion, cache,
new InternalRegionArguments().setDestroyLockFlag(true)
.setRecreateFlag(false).setSnapshotInputStream(null)
.setImageTarget(null)
.setIsUsedForParallelGatewaySenderQueue(true)
- .setParallelGatewaySender((AbstractGatewaySender)pgSender)
- .setIsUsedForHDFSParallelGatewaySenderQueue(isUsedForHDFS));
+ .setParallelGatewaySender((AbstractGatewaySender)pgSender));
this.sender = (AbstractGatewaySender)pgSender;
}
@@ -1925,9 +1915,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
static class MetaRegionFactory {
ParallelGatewaySenderQueueMetaRegion newMetataRegion(
- GemFireCacheImpl cache, final String prQName, final RegionAttributes ra, AbstractGatewaySender sender, boolean isUsedForHDFS) {
+ GemFireCacheImpl cache, final String prQName, final RegionAttributes ra, AbstractGatewaySender sender) {
ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion(
- prQName, ra, null, cache, sender, isUsedForHDFS);
+ prQName, ra, null, cache, sender);
return meta;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
index 77f9596..0015665 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
@@ -41,7 +41,6 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
private int maxQueueMemory = 0;
private boolean isParallel = false;
private boolean isBucketSorted = false;
- private boolean isHDFSQueue = false;
private int dispatcherThreads = 1;
private OrderPolicy orderPolicy = OrderPolicy.KEY;
@@ -62,7 +61,6 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
this.orderPolicy = senderAttrs.policy;
this.asyncEventListener = eventListener;
this.isBucketSorted = senderAttrs.isBucketSorted;
- this.isHDFSQueue = senderAttrs.isHDFSQueue;
this.gatewayEventSubstitutionFilter = senderAttrs.eventSubstitutionFilter;
}
@@ -213,11 +211,4 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
public void setBucketSorted(boolean isBucketSorted) {
this.isBucketSorted = isBucketSorted;
}
- public boolean isHDFSQueue() {
- return this.isHDFSQueue;
- }
-
- public void setIsHDFSQueue(boolean isHDFSQueue) {
- this.isHDFSQueue = isHDFSQueue;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
index 915bde9..d52d05e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
@@ -91,11 +91,6 @@ import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.Assert;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.internal.cache.CacheServerImpl;
import com.gemstone.gemfire.internal.cache.CacheConfig;
import com.gemstone.gemfire.internal.cache.CacheServerLauncher;
@@ -198,8 +193,7 @@ public class CacheCreation implements InternalCache {
* This is important for unit testing 44914.
*/
protected final Map diskStores = new LinkedHashMap();
- protected final Map hdfsStores = new LinkedHashMap();
-
+
private final List<File> backups = new ArrayList<File>();
private CacheConfig cacheConfig = new CacheConfig();
@@ -513,13 +507,6 @@ public class CacheCreation implements InternalCache {
}
}
- for(Iterator iter = this.hdfsStores.entrySet().iterator(); iter.hasNext(); ) {
- Entry entry = (Entry) iter.next();
- HDFSStoreCreation hdfsStoreCreation = (HDFSStoreCreation) entry.getValue();
- HDFSStoreFactory storefactory = cache.createHDFSStoreFactory(hdfsStoreCreation);
- storefactory.create((String) entry.getKey());
- }
-
cache.initializePdxRegistry();
@@ -530,19 +517,6 @@ public class CacheCreation implements InternalCache {
(RegionAttributesCreation) getRegionAttributes(id);
creation.inheritAttributes(cache, false);
- // TODO: HDFS: HDFS store/queue will be mapped against region path and not
- // the attribute id; don't really understand what this is trying to do
- if (creation.getHDFSStoreName() != null)
- {
- HDFSStoreImpl store = cache.findHDFSStore(creation.getHDFSStoreName());
- if(store == null) {
- HDFSIntegrationUtil.createDefaultAsyncQueueForHDFS((Cache)cache, creation.getHDFSWriteOnly(), id);
- }
- }
- if (creation.getHDFSStoreName() != null && creation.getPartitionAttributes().getColocatedWith() == null) {
- creation.addAsyncEventQueueId(HDFSStoreFactoryImpl.getEventQueueName(id));
- }
-
RegionAttributes attrs;
// Don't let the RegionAttributesCreation escape to the user
AttributesFactory factory = new AttributesFactory(creation);
@@ -1421,17 +1395,6 @@ public class CacheCreation implements InternalCache {
}
@Override
- public Collection<HDFSStoreImpl> getHDFSStores() {
- return this.hdfsStores.values();
- }
-
- public void addHDFSStore(String name, HDFSStoreCreation hs) {
- this.hdfsStores.put(name, hs);
- }
-
-
-
- @Override
public DistributedMember getMyId() {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
index c6b0509..aa7d49a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
@@ -487,8 +487,6 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler {
protected static final String PERSISTENT_REPLICATE_DP = "persistent-replicate";
protected static final String PARTITION_DP = "partition";
protected static final String PERSISTENT_PARTITION_DP = "persistent-partition";
- protected static final String HDFS_PARTITION_DP = "hdfs-partition";
- protected static final String HDFS_PERSISTENT_PARTITION_DP = "hdfs-persistent-partition";
/** The name of the <code>keep-alive-timeout</code> attribute */
protected static final String KEEP_ALIVE_TIMEOUT = "keep-alive-timeout";
@@ -765,35 +763,6 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler {
public static final String ASYNC_EVENT_QUEUE = "async-event-queue";
protected static final String ASYNC_EVENT_QUEUE_IDS = "async-event-queue-ids";
- protected static final String HDFS_EVENT_QUEUE = "hdfs-event-queue";
- protected static final String HDFS_STORE_NAME = "hdfs-store-name";
- public static final String HDFS_STORE = "hdfs-store";
- protected static final String HDFS_HOME_DIR = "home-dir";
- protected static final String HDFS_READ_CACHE_SIZE = "read-cache-size";
- protected static final String HDFS_MAX_MEMORY = "max-memory";
- protected static final String HDFS_BATCH_SIZE = "batch-size";
- protected static final String HDFS_BATCH_INTERVAL = "batch-interval";
- protected static final String HDFS_DISPATCHER_THREADS = "dispatcher-threads";
- protected static final String HDFS_BUFFER_PERSISTENT = "buffer-persistent";
- protected static final String HDFS_SYNCHRONOUS_DISK_WRITE = "synchronous-disk-write";
- protected static final String HDFS_DISK_STORE = "disk-store";
- protected static final String HDFS_MAX_WRITE_ONLY_FILE_SIZE = "max-write-only-file-size";
- public static final String HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL = "write-only-file-rollover-interval";
-
- protected static final String HDFS_NAMENODE_URL = "namenode-url";
- protected static final String HDFS_CLIENT_CONFIG_FILE = "hdfs-client-config-file";
- public static final String HDFS_PURGE_INTERVAL = "purge-interval";
- public static final String HDFS_MAJOR_COMPACTION = "major-compaction";
- public static final String HDFS_MAJOR_COMPACTION_INTERVAL = "major-compaction-interval";
- public static final String HDFS_MAJOR_COMPACTION_THREADS = "major-compaction-threads";
- public static final String HDFS_MINOR_COMPACTION = "minor-compaction";
- public static final String HDFS_MINOR_COMPACTION_THREADS = "minor-compaction-threads";
-
- public static final String HDFS_TIME_FOR_FILE_ROLLOVER = "file-rollover-time-secs";
-
- protected static final String HDFS_WRITE_ONLY = "hdfs-write-only";
- protected static final String HDFS_QUEUE_BATCH_SIZE = "batch-size-mb";
-
/** The name of the <code>compressor</code> attribute */
protected static final String COMPRESSOR = "compressor";
/** The name of the <code>off-heap</code> attribute