You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:58 UTC
[12/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java
new file mode 100644
index 0000000..dacc208
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import java.util.Iterator;
+
+/**
+ * Provides an {@link Iterator} that allows access to the current iteration
+ * element. The implementor must provide access to the current element
+ * as well as a means to move to the next element.
+ *
+ *
+ * @param <E> the element type
+ */
+public interface CursorIterator<E> extends Iterator<E> {
+ /**
+ * Returns the element at the current position.
+ * @return the current element
+ */
+ E current();
+
+ /**
+ * Provides an iteration cursor by wrapping an {@link Iterator}.
+ *
+ * @param <E> the element type
+ */
+ public static class WrappedIterator<E> implements CursorIterator<E> {
+ /** the underlying iterator */
+ private final Iterator<E> src;
+
+ /** the current iteration element */
+ private E current;
+
+ public WrappedIterator(Iterator<E> src) {
+ this.src = src;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return src.hasNext();
+ }
+
+ @Override
+ public E next() {
+ current = src.next();
+ return current;
+ }
+
+ @Override
+ public E current() {
+ return current;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Returns the unwrapped interator.
+ * @return the iterator
+ */
+ public Iterator<E> unwrap() {
+ return src;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java
new file mode 100644
index 0000000..52470d0
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
+
+/**
+ * Delegates object comparisons to one or more embedded comparators.
+ *
+ */
+public interface DelegatingSerializedComparator extends SerializedComparator {
+ /**
+ * Injects the embedded comparators.
+ * @param comparators the comparators for delegation
+ */
+ void setComparators(SerializedComparator[] comparators);
+
+ /**
+ * Returns the embedded comparators.
+ * @return the comparators
+ */
+ SerializedComparator[] getComparators();
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java
new file mode 100644
index 0000000..fdf3852
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import static com.gemstone.gemfire.distributed.internal.DistributionStats.getStatTime;
+
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.Statistics;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.StatisticsTypeFactory;
+import com.gemstone.gemfire.internal.DummyStatisticsFactory;
+import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
+
+public class HFileStoreStatistics {
+ private final Statistics stats;
+
+ private final CacheOperation blockCache;
+
+ public HFileStoreStatistics(String typeName, String name) {
+ this(new DummyStatisticsFactory(), typeName, name);
+ }
+
+ public HFileStoreStatistics(StatisticsFactory factory, String typeName, String name) {
+ StatisticsTypeFactory tf = StatisticsTypeFactoryImpl.singleton();
+
+ StatisticDescriptor bcMisses = tf.createLongCounter("blockCacheMisses", "The total number of block cache misses", "misses");
+ StatisticDescriptor bcHits = tf.createLongCounter("blockCacheHits", "The total number of block cache hits", "hits");
+ StatisticDescriptor bcCached = tf.createLongGauge("blocksCached", "The current number of cached blocks", "blocks");
+ StatisticDescriptor bcBytesCached = tf.createLongGauge("blockBytesCached", "The current number of bytes cached", "bytes");
+ StatisticDescriptor bcBytesEvicted = tf.createLongCounter("blockBytesEvicted", "The total number of bytes cached", "bytes");
+
+
+ StatisticsType type = tf.createType(typeName,
+ "Statistics about structured I/O operations for a region", new StatisticDescriptor[] {
+ bcMisses, bcHits, bcCached, bcBytesCached, bcBytesEvicted
+ });
+
+ blockCache = new CacheOperation(bcMisses.getId(), bcHits.getId(), bcCached.getId(), bcBytesCached.getId(), bcBytesEvicted.getId());
+
+
+ stats = factory.createAtomicStatistics(type, name);
+ }
+
+ public void close() {
+ stats.close();
+ }
+
+ public Statistics getStats() {
+ return stats;
+ }
+
+ public CacheOperation getBlockCache() {
+ return blockCache;
+ }
+
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("blockCache = {").append(blockCache).append("}\n");
+
+ return sb.toString();
+ }
+
+ public class TimedOperation {
+ protected final int countId;
+ protected final int inProgressId;
+ protected final int timeId;
+ private final int errorsId;
+
+ public TimedOperation(int count, int inProgress, int time, int errors) {
+ this.countId = count;
+ this.inProgressId = inProgress;
+ this.timeId = time;
+ this.errorsId = errors;
+ }
+
+ public long begin() {
+ stats.incLong(inProgressId, 1);
+ return getStatTime();
+ }
+
+ public long end(long start) {
+ stats.incLong(inProgressId, -1);
+ stats.incLong(countId, 1);
+ stats.incLong(timeId, getStatTime() - start);
+ return getStatTime();
+ }
+
+ public void error(long start) {
+ end(start);
+ stats.incLong(errorsId, 1);
+ }
+
+ public long getCount() {
+ return stats.getLong(countId);
+ }
+
+ public long getInProgress() {
+ return stats.getLong(inProgressId);
+ }
+
+ public long getTime() {
+ return stats.getLong(timeId);
+ }
+
+ public long getErrors() {
+ return stats.getLong(errorsId);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("count=").append(getCount());
+ sb.append(";inProgress=").append(getInProgress());
+ sb.append(";errors=").append(getErrors());
+ sb.append(";time=").append(getTime());
+
+ return sb.toString();
+ }
+ }
+
+ public class CacheOperation {
+ private final int missesId;
+ private final int hitsId;
+ private final int cachedId;
+ private final int bytesCachedId;
+ private final int bytesEvictedId;
+
+ public CacheOperation(int missesId, int hitsId, int cachedId,
+ int bytesCachedId, int bytesEvictedId) {
+ this.missesId = missesId;
+ this.hitsId = hitsId;
+ this.cachedId = cachedId;
+ this.bytesCachedId = bytesCachedId;
+ this.bytesEvictedId = bytesEvictedId;
+ }
+
+ public void store(long bytes) {
+ stats.incLong(cachedId, 1);
+ stats.incLong(bytesCachedId, bytes);
+ }
+
+ public void evict(long bytes) {
+ stats.incLong(cachedId, -1);
+ stats.incLong(bytesCachedId, -bytes);
+ stats.incLong(bytesEvictedId, bytes);
+ }
+
+ public void hit() {
+ stats.incLong(hitsId, 1);
+ }
+
+ public void miss() {
+ stats.incLong(missesId, 1);
+ }
+
+ public long getMisses() {
+ return stats.getLong(missesId);
+ }
+
+ public long getHits() {
+ return stats.getLong(hitsId);
+ }
+
+ public long getCached() {
+ return stats.getLong(cachedId);
+ }
+
+ public long getBytesCached() {
+ return stats.getLong(bytesCachedId);
+ }
+
+ public long getBytesEvicted() {
+ return stats.getLong(bytesEvictedId);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("misses=").append(getMisses());
+ sb.append(";hits=").append(getHits());
+ sb.append(";cached=").append(getCached());
+ sb.append(";bytesCached=").append(getBytesCached());
+ sb.append(";bytesEvicted=").append(getBytesEvicted());
+
+ return sb.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java
new file mode 100644
index 0000000..df7e1ac
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import java.util.Iterator;
+
+/**
+ * Provides an {@link Iterator} view over a collection of keys and values. The
+ * implementor must provide access to the current key/value as well as a means
+ * to move to the next pair.
+ *
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface KeyValueIterator<K, V> extends Iterator<K> {
+ /**
+ * Returns the key at the current position.
+ * @return the key
+ */
+ public K key();
+
+ /**
+ * Returns the value at the current position.
+ * @return the value
+ */
+ public abstract V value();
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java
new file mode 100644
index 0000000..35baafb
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import static com.gemstone.gemfire.distributed.internal.DistributionStats.getStatTime;
+
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.Statistics;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.StatisticsTypeFactory;
+import com.gemstone.gemfire.internal.DummyStatisticsFactory;
+import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
+
+public class SortedOplogStatistics {
+ private final Statistics stats;
+
+ private final IOOperation read;
+ private final ScanOperation scan;
+ private final IOOperation write;
+ private final IOOperation put;
+ private final IOOperation flush;
+ private final IOOperation minorCompaction;
+ private final IOOperation majorCompaction;
+ private final BloomOperation bloom;
+ private final TimedOperation clear;
+ private final TimedOperation destroy;
+
+ private final IOOperation blockRead;
+ private final CacheOperation blockCache;
+
+ private final int activeFilesId;
+ private final int inactiveFilesId;
+ private final int activeReadersId;
+
+ private final int storeUsageBytesId;
+
+ public SortedOplogStatistics(String typeName, String name) {
+ this(new DummyStatisticsFactory(), typeName, name);
+ }
+
+ public SortedOplogStatistics(StatisticsFactory factory, String typeName, String name) {
+ StatisticsTypeFactory tf = StatisticsTypeFactoryImpl.singleton();
+
+ StatisticDescriptor readCount = tf.createLongCounter("reads", "The total number of read operations", "ops");
+ StatisticDescriptor readInProgress = tf.createLongGauge("readsInProgress", "The number of read operations in progress", "ops");
+ StatisticDescriptor readTime = tf.createLongCounter("readTime", "The total time spent reading from disk", "nanoseconds");
+ StatisticDescriptor readBytes = tf.createLongCounter("readBytes", "The total number of bytes read from disk", "bytes");
+ StatisticDescriptor readErrors = tf.createLongCounter("readErrors", "The total number of read errors", "errors");
+
+ StatisticDescriptor scanCount = tf.createLongCounter("scans", "The total number of scan operations", "ops");
+ StatisticDescriptor scanInProgress = tf.createLongGauge("scansInProgress", "The number of scan operations in progress", "ops");
+ StatisticDescriptor scanTime = tf.createLongCounter("scanTime", "The total time scanner was operational", "nanoseconds");
+ StatisticDescriptor scanBytes = tf.createLongCounter("scanBytes", "The total number of bytes scanned from disk", "bytes");
+ StatisticDescriptor scanErrors = tf.createLongCounter("scanErrors", "The total number of scan errors", "errors");
+ StatisticDescriptor scanIterations = tf.createLongCounter("scanIterations", "The total number of scan iterations", "ops");
+ StatisticDescriptor scanIterationTime = tf.createLongCounter("scanIterationTime", "The total time spent scanning from persistence layer", "nanoseconds");
+
+ StatisticDescriptor writeCount = tf.createLongCounter("writes", "The total number of write operations", "ops");
+ StatisticDescriptor writeInProgress = tf.createLongGauge("writesInProgress", "The number of write operations in progress", "ops");
+ StatisticDescriptor writeTime = tf.createLongCounter("writeTime", "The total time spent writing to disk", "nanoseconds");
+ StatisticDescriptor writeBytes = tf.createLongCounter("writeBytes", "The total number of bytes written to disk", "bytes");
+ StatisticDescriptor writeErrors = tf.createLongCounter("writeErrors", "The total number of write errors", "errors");
+
+ StatisticDescriptor putCount = tf.createLongCounter("puts", "The total number of put operations", "ops");
+ StatisticDescriptor putInProgress = tf.createLongGauge("putsInProgress", "The number of put operations in progress", "ops");
+ StatisticDescriptor putTime = tf.createLongCounter("putTime", "The total time spent in put calls", "nanoseconds");
+ StatisticDescriptor putBytes = tf.createLongCounter("putBytes", "The total number of bytes put", "bytes");
+ StatisticDescriptor putErrors = tf.createLongCounter("putErrors", "The total number of put errors", "errors");
+
+ StatisticDescriptor flushCount = tf.createLongCounter("flushes", "The total number of flush operations", "ops");
+ StatisticDescriptor flushInProgress = tf.createLongGauge("flushesInProgress", "The number of flush operations in progress", "ops");
+ StatisticDescriptor flushTime = tf.createLongCounter("flushTime", "The total time spent flushing to disk", "nanoseconds");
+ StatisticDescriptor flushBytes = tf.createLongCounter("flushBytes", "The total number of bytes flushed to disk", "bytes");
+ StatisticDescriptor flushErrors = tf.createLongCounter("flushErrors", "The total number of flush errors", "errors");
+
+ StatisticDescriptor minorCompactionCount = tf.createLongCounter("minorCompactions", "The total number of minor compaction operations", "ops");
+ StatisticDescriptor minorCompactionInProgress = tf.createLongGauge("minorCompactionsInProgress", "The number of minor compaction operations in progress", "ops");
+ StatisticDescriptor minorCompactionTime = tf.createLongCounter("minorCompactionTime", "The total time spent in minor compactions", "nanoseconds");
+ StatisticDescriptor minorCompactionBytes = tf.createLongCounter("minorCompactionBytes", "The total number of bytes collected during minor compactions", "bytes");
+ StatisticDescriptor minorCompactionErrors = tf.createLongCounter("minorCompactionErrors", "The total number of minor compaction errors", "errors");
+
+ StatisticDescriptor majorCompactionCount = tf.createLongCounter("majorCompactions", "The total number of major compaction operations", "ops");
+ StatisticDescriptor majorCompactionInProgress = tf.createLongGauge("majorCompactionsInProgress", "The number of major compaction operations in progress", "ops");
+ StatisticDescriptor majorCompactionTime = tf.createLongCounter("majorCompactionTime", "The total time spent in major compactions", "nanoseconds");
+ StatisticDescriptor majorCompactionBytes = tf.createLongCounter("majorCompactionBytes", "The total number of bytes collected during major compactions", "bytes");
+ StatisticDescriptor majorCompactionErrors = tf.createLongCounter("majorCompactionErrors", "The total number of major compaction errors", "errors");
+
+ StatisticDescriptor bloomCount = tf.createLongCounter("bloomFilterCheck", "The total number of Bloom Filter checks", "ops");
+ StatisticDescriptor bloomInProgress = tf.createLongGauge("bloomFilterChecksInProgress", "The number of Bloom Filter checks in progress", "ops");
+ StatisticDescriptor bloomTime = tf.createLongCounter("bloomFilterCheckTime", "The total time spent checking the Bloom Filter", "nanoseconds");
+ StatisticDescriptor bloomErrors = tf.createLongCounter("bloomFilterErrors", "The total number of Bloom Filter errors", "errors");
+ StatisticDescriptor bloomFalsePositive = tf.createLongCounter("bloomFilterFalsePositives", "The total number of Bloom Filter false positives", "false positives");
+
+ StatisticDescriptor clearCount = tf.createLongCounter("clears", "The total number of clear operations", "ops");
+ StatisticDescriptor clearInProgress = tf.createLongGauge("clearsInProgress", "The number of clear operations in progress", "ops");
+ StatisticDescriptor clearTime = tf.createLongCounter("clearTime", "The total time spent in clear operations", "nanoseconds");
+ StatisticDescriptor clearErrors = tf.createLongGauge("clearErrors", "The total number of clear errors", "errors");
+
+ StatisticDescriptor destroyCount = tf.createLongCounter("destroys", "The total number of destroy operations", "ops");
+ StatisticDescriptor destroyInProgress = tf.createLongGauge("destroysInProgress", "The number of destroy operations in progress", "ops");
+ StatisticDescriptor destroyTime = tf.createLongCounter("destroyTime", "The total time spent in destroy operations", "nanoseconds");
+ StatisticDescriptor destroyErrors = tf.createLongGauge("destroyErrors", "The total number of destroy errors", "errors");
+
+ StatisticDescriptor brCount = tf.createLongCounter("blockReads", "The total number of block read operations", "ops");
+ StatisticDescriptor brInProgress = tf.createLongGauge("blockReadsInProgress", "The number of block read operations in progress", "ops");
+ StatisticDescriptor brTime = tf.createLongCounter("blockReadTime", "The total time spent reading blocks from disk", "nanoseconds");
+ StatisticDescriptor brBytes = tf.createLongCounter("blockReadBytes", "The total number of block bytes read from disk", "bytes");
+ StatisticDescriptor brErrors = tf.createLongCounter("blockReadErrors", "The total number of block read errors", "errors");
+
+ StatisticDescriptor bcMisses = tf.createLongCounter("blockCacheMisses", "The total number of block cache misses", "misses");
+ StatisticDescriptor bcHits = tf.createLongCounter("blockCacheHits", "The total number of block cache hits", "hits");
+ StatisticDescriptor bcCached = tf.createLongGauge("blocksCached", "The current number of cached blocks", "blocks");
+ StatisticDescriptor bcBytesCached = tf.createLongGauge("blockBytesCached", "The current number of bytes cached", "bytes");
+ StatisticDescriptor bcBytesEvicted = tf.createLongCounter("blockBytesEvicted", "The total number of bytes cached", "bytes");
+
+ StatisticDescriptor activeFileCount = tf.createLongGauge("activeFileCount", "The total number of active files", "files");
+ StatisticDescriptor inactiveFileCount = tf.createLongGauge("inactiveFileCount", "The total number of inactive files", "files");
+ StatisticDescriptor activeReaderCount = tf.createLongGauge("activeReaderCount", "The total number of active file readers", "files");
+
+ StatisticDescriptor storeUsageBytes = tf.createLongGauge("storeUsageBytes", "The total volume occupied on persistent store", "bytes");
+
+ StatisticsType type = tf.createType(typeName,
+ "Statistics about structured I/O operations for a region", new StatisticDescriptor[] {
+ readCount, readInProgress, readTime, readBytes, readErrors,
+ scanCount, scanInProgress, scanTime, scanBytes, scanErrors, scanIterations, scanIterationTime,
+ writeCount, writeInProgress, writeTime, writeBytes, writeErrors,
+ putCount, putInProgress, putTime, putBytes, putErrors,
+ flushCount, flushInProgress, flushTime, flushBytes, flushErrors,
+ minorCompactionCount, minorCompactionInProgress, minorCompactionTime, minorCompactionBytes, minorCompactionErrors,
+ majorCompactionCount, majorCompactionInProgress, majorCompactionTime, majorCompactionBytes, majorCompactionErrors,
+ bloomCount, bloomInProgress, bloomTime, bloomErrors, bloomFalsePositive,
+ clearCount, clearInProgress, clearTime, clearErrors,
+ destroyCount, destroyInProgress, destroyTime, destroyErrors,
+ brCount, brInProgress, brTime, brBytes, brErrors,
+ bcMisses, bcHits, bcCached, bcBytesCached, bcBytesEvicted,
+ activeFileCount, inactiveFileCount, activeReaderCount, storeUsageBytes
+ });
+
+ read = new IOOperation(readCount.getId(), readInProgress.getId(), readTime.getId(), readBytes.getId(), readErrors.getId());
+ scan = new ScanOperation(scanCount.getId(), scanInProgress.getId(), scanTime.getId(), scanBytes.getId(), scanErrors.getId(), scanIterations.getId(), scanIterationTime.getId());
+ write = new IOOperation(writeCount.getId(), writeInProgress.getId(), writeTime.getId(), writeBytes.getId(), writeErrors.getId());
+ put = new IOOperation(putCount.getId(), putInProgress.getId(), putTime.getId(), putBytes.getId(), putErrors.getId());
+ flush = new IOOperation(flushCount.getId(), flushInProgress.getId(), flushTime.getId(), flushBytes.getId(), flushErrors.getId());
+ minorCompaction = new IOOperation(minorCompactionCount.getId(), minorCompactionInProgress.getId(), minorCompactionTime.getId(), minorCompactionBytes.getId(), minorCompactionErrors.getId());
+ majorCompaction = new IOOperation(majorCompactionCount.getId(), majorCompactionInProgress.getId(), majorCompactionTime.getId(), majorCompactionBytes.getId(), majorCompactionErrors.getId());
+ bloom = new BloomOperation(bloomCount.getId(), bloomInProgress.getId(), bloomTime.getId(), bloomErrors.getId(), bloomFalsePositive.getId());
+ clear = new TimedOperation(clearCount.getId(), clearInProgress.getId(), clearTime.getId(), clearErrors.getId());
+ destroy = new TimedOperation(destroyCount.getId(), destroyInProgress.getId(), destroyTime.getId(), destroyErrors.getId());
+
+ blockRead = new IOOperation(brCount.getId(), brInProgress.getId(), brTime.getId(), brBytes.getId(), brErrors.getId());
+ blockCache = new CacheOperation(bcMisses.getId(), bcHits.getId(), bcCached.getId(), bcBytesCached.getId(), bcBytesEvicted.getId());
+
+ activeFilesId = activeFileCount.getId();
+ inactiveFilesId = inactiveFileCount.getId();
+ activeReadersId = activeReaderCount.getId();
+ storeUsageBytesId = storeUsageBytes.getId();
+
+ stats = factory.createAtomicStatistics(type, name);
+ }
+
+ public void close() {
+ stats.close();
+ }
+
+ public Statistics getStats() {
+ return stats;
+ }
+
+ public IOOperation getRead() {
+ return read;
+ }
+
+ public ScanOperation getScan() {
+ return scan;
+ }
+
+ public IOOperation getWrite() {
+ return write;
+ }
+
+ public IOOperation getPut() {
+ return put;
+ }
+
+ public IOOperation getFlush() {
+ return flush;
+ }
+
+ public IOOperation getMinorCompaction() {
+ return minorCompaction;
+ }
+
+ public IOOperation getMajorCompaction() {
+ return majorCompaction;
+ }
+
+ public BloomOperation getBloom() {
+ return bloom;
+ }
+
+ public TimedOperation getClear() {
+ return clear;
+ }
+
+ public TimedOperation getDestroy() {
+ return destroy;
+ }
+
+ public IOOperation getBlockRead() {
+ return blockRead;
+ }
+
+ public CacheOperation getBlockCache() {
+ return blockCache;
+ }
+
+ public long getActiveFileCount() {
+ return stats.getLong(activeFilesId);
+ }
+
+ public long getInactiveFileCount() {
+ return stats.getLong(inactiveFilesId);
+ }
+
+ public long getActiveReaderCount() {
+ return stats.getLong(activeReadersId);
+ }
+
+ public void incActiveFiles(int amt) {
+ stats.incLong(activeFilesId, amt);
+ assert stats.getLong(activeFilesId) >= 0;
+ }
+
+ public void incInactiveFiles(int amt) {
+ stats.incLong(inactiveFilesId, amt);
+ assert stats.getLong(inactiveFilesId) >= 0;
+ }
+
+ public void incActiveReaders(int amt) {
+ stats.incLong(activeReadersId, amt);
+ assert stats.getLong(activeReadersId) >= 0;
+ }
+
+ public long getStoreUsageBytes() {
+ return stats.getLong(storeUsageBytesId);
+ }
+
+ public void incStoreUsageBytes(long amt) {
+ stats.incLong(storeUsageBytesId, amt);
+ assert stats.getLong(storeUsageBytesId) >= 0;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("read = {").append(read).append("}\n");
+ sb.append("scan = {").append(scan).append("}\n");
+ sb.append("write = {").append(write).append("}\n");
+ sb.append("put = {").append(put).append("}\n");
+ sb.append("flush = {").append(flush).append("}\n");
+ sb.append("minorCompaction = {").append(minorCompaction).append("}\n");
+ sb.append("majorCompaction = {").append(majorCompaction).append("}\n");
+ sb.append("bloom = {").append(bloom).append("}\n");
+ sb.append("clear = {").append(clear).append("}\n");
+ sb.append("destroy = {").append(destroy).append("}\n");
+ sb.append("blockRead = {").append(blockRead).append("}\n");
+ sb.append("blockCache = {").append(blockCache).append("}\n");
+ sb.append("activeFiles = ").append(stats.getLong(activeFilesId)).append("\n");
+ sb.append("inactiveFiles = ").append(stats.getLong(inactiveFilesId)).append("\n");
+ sb.append("activeReaders = ").append(stats.getLong(activeReadersId)).append("\n");
+ sb.append("storeUsageBytes = ").append(stats.getLong(storeUsageBytesId)).append("\n");
+
+ return sb.toString();
+ }
+
+ public class TimedOperation {
+ protected final int countId;
+ protected final int inProgressId;
+ protected final int timeId;
+ private final int errorsId;
+
+ public TimedOperation(int count, int inProgress, int time, int errors) {
+ this.countId = count;
+ this.inProgressId = inProgress;
+ this.timeId = time;
+ this.errorsId = errors;
+ }
+
+ public long begin() {
+ stats.incLong(inProgressId, 1);
+ return getStatTime();
+ }
+
+ public long end(long start) {
+ stats.incLong(inProgressId, -1);
+ stats.incLong(countId, 1);
+ stats.incLong(timeId, getStatTime() - start);
+ return getStatTime();
+ }
+
+ public void error(long start) {
+ end(start);
+ stats.incLong(errorsId, 1);
+ }
+
+ public long getCount() {
+ return stats.getLong(countId);
+ }
+
+ public long getInProgress() {
+ return stats.getLong(inProgressId);
+ }
+
+ public long getTime() {
+ return stats.getLong(timeId);
+ }
+
+ public long getErrors() {
+ return stats.getLong(errorsId);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("count=").append(getCount());
+ sb.append(";inProgress=").append(getInProgress());
+ sb.append(";errors=").append(getErrors());
+ sb.append(";time=").append(getTime());
+
+ return sb.toString();
+ }
+ }
+
+ public class IOOperation extends TimedOperation {
+ protected final int bytesId;
+
+ public IOOperation(int count, int inProgress, int time, int bytes, int errors) {
+ super(count, inProgress, time, errors);
+ this.bytesId = bytes;
+ }
+
+ public long end(long bytes, long start) {
+ stats.incLong(bytesId, bytes);
+ return super.end(start);
+ }
+
+ public long getBytes() {
+ return stats.getLong(bytesId);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(super.toString());
+ sb.append(";bytes=").append(getBytes());
+
+ return sb.toString();
+ }
+ }
+
+ public class ScanOperation extends IOOperation {
+ private final int iterationsId;
+ private final int iterationTimeId;
+
+ public ScanOperation(int count, int inProgress, int time, int bytes, int errors, int iterCount, int iterTime) {
+ super(count, inProgress, time, bytes, errors);
+ iterationsId = iterCount;
+ iterationTimeId = iterTime;
+ }
+
+ public long beginIteration() {
+ return getStatTime();
+ }
+
+ public void endIteration(long bytes, long start){
+ stats.incLong(iterationsId, 1);
+ stats.incLong(bytesId, bytes);
+ stats.incLong(iterationTimeId, getStatTime() - start);
+ }
+
+ public long getIterations() {
+ return stats.getLong(iterationsId);
+ }
+
+ public long getIterationTime() {
+ return stats.getLong(iterationTimeId);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(super.toString());
+ sb.append(";iterations=").append(getIterations());
+ sb.append(";iterationTime=").append(getIterationTime());
+
+ return sb.toString();
+ }
+ }
+
+ public class BloomOperation extends TimedOperation {
+ private final int falsePositiveId;
+
+ public BloomOperation(int count, int inProgress, int time, int errors, int falsePositive) {
+ super(count, inProgress, time, errors);
+ this.falsePositiveId = falsePositive;
+ }
+
+ public void falsePositive() {
+ stats.incLong(falsePositiveId, 1);
+ }
+
+ public long getFalsePositives() {
+ return stats.getLong(falsePositiveId);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(super.toString());
+ sb.append(";falsePositives=").append(getFalsePositives());
+
+ return sb.toString();
+ }
+ }
+
+ public class CacheOperation {
+ private final int missesId;
+ private final int hitsId;
+ private final int cachedId;
+ private final int bytesCachedId;
+ private final int bytesEvictedId;
+
+ public CacheOperation(int missesId, int hitsId, int cachedId,
+ int bytesCachedId, int bytesEvictedId) {
+ this.missesId = missesId;
+ this.hitsId = hitsId;
+ this.cachedId = cachedId;
+ this.bytesCachedId = bytesCachedId;
+ this.bytesEvictedId = bytesEvictedId;
+ }
+
+ public void store(long bytes) {
+ stats.incLong(cachedId, 1);
+ stats.incLong(bytesCachedId, bytes);
+ }
+
+ public void evict(long bytes) {
+ stats.incLong(cachedId, -1);
+ stats.incLong(bytesCachedId, -bytes);
+ stats.incLong(bytesEvictedId, bytes);
+ }
+
+ public void hit() {
+ stats.incLong(hitsId, 1);
+ }
+
+ public void miss() {
+ stats.incLong(missesId, 1);
+ }
+
+ public long getMisses() {
+ return stats.getLong(missesId);
+ }
+
+ public long getHits() {
+ return stats.getLong(hitsId);
+ }
+
+ public long getCached() {
+ return stats.getLong(cachedId);
+ }
+
+ public long getBytesCached() {
+ return stats.getLong(bytesCachedId);
+ }
+
+ public long getBytesEvicted() {
+ return stats.getLong(bytesEvictedId);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("misses=").append(getMisses());
+ sb.append(";hits=").append(getHits());
+ sb.append(";cached=").append(getCached());
+ sb.append(";bytesCached=").append(getBytesCached());
+ sb.append(";bytesEvicted=").append(getBytesEvicted());
+
+ return sb.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java
new file mode 100644
index 0000000..1042e22
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Defines a means to read sorted data including performing range scans.
+ *
+ * @param <V> type of value returned by the sorted reader
+ *
+ */
+public interface SortedReader<V> extends Closeable {
+ /**
+ * Defines the names of additional data that may be associated with a sorted
+ * reader.
+ */
+ public enum Metadata {
+ /** identifies the disk store associated with the soplog, optional */
+ DISK_STORE,
+
+ /** identifies the RVV data, optional */
+ RVV;
+
+ /**
+ * Converts the metadata name to bytes.
+ * @return the bytes
+ */
+ public byte[] bytes() {
+ return ("gemfire." + name()).getBytes();
+ }
+ }
+
+ /**
+ * Filters data based on metadata values.
+ */
+ public interface MetadataFilter {
+ /**
+ * Returns the name this filter acts upon.
+ * @return the name
+ */
+ Metadata getName();
+
+ /**
+ * Returns true if the metadata value passes the filter.
+ * @param value the value to check; may be null if the metadata value does
+ * not exist or has not been assigned yet
+ * @return true if accepted
+ */
+ boolean accept(byte[] value);
+ }
+
+ /**
+ * Allows comparisons between serialized objects.
+ */
+ public interface SerializedComparator extends RawComparator<byte[]> {
+ }
+
+ /**
+ * Allows sorted iteration through a set of keys and values.
+ */
+ public interface SortedIterator<V> extends KeyValueIterator<ByteBuffer, V> {
+ /**
+ * Closes the iterator and frees any retained resources.
+ */
+ public abstract void close();
+ }
+
+ /**
+ * Defines the statistics available on a sorted file.
+ */
+ public interface SortedStatistics {
+ /**
+ * Returns the number of keys in the file.
+ * @return the key count
+ */
+ long keyCount();
+
+ /**
+ * Returns the first key in the file.
+ * @return the first key
+ */
+ byte[] firstKey();
+
+ /**
+ * Returns the last key in the file.
+ * @return the last key
+ */
+ byte[] lastKey();
+
+ /**
+ * Returns the average key size in bytes.
+ * @return the average key size
+ */
+ double avgKeySize();
+
+ /**
+ * Returns the average value size in bytes.
+ * @return the average value size
+ */
+ double avgValueSize();
+
+ /**
+ * Frees any resources held by for statistics generation.
+ */
+ void close();
+ }
+
+ /**
+ * Returns true if the bloom filter might contain the supplied key. The
+ * nature of the bloom filter is such that false positives are allowed, but
+ * false negatives cannot occur.
+ *
+ * @param key the key to test
+ * @return true if the key might be present
+ * @throws IOException read error
+ */
+ boolean mightContain(byte[] key) throws IOException;
+
+ /**
+ * Returns the value associated with the given key.
+ *
+ * @param key the key
+ * @return the value, or null if the key is not found
+ * @throws IOException read error
+ */
+ V read(byte[] key) throws IOException;
+
+ /**
+ * Iterates from the first key in the file to the requested key.
+ * @param to the ending key
+ * @param inclusive true if the ending key is included in the iteration
+ * @return the sorted iterator
+ * @throws IOException scan error
+ */
+ SortedIterator<V> head(byte[] to, boolean inclusive) throws IOException;
+
+ /**
+ * Iterates from the requested key to the last key in the file.
+ * @param from the starting key
+ * @param inclusive true if the starting key should be included in the iteration
+ * @return the sorted iterator
+ * @throws IOException scan error
+ */
+ SortedIterator<V> tail(byte[] from, boolean inclusive) throws IOException;
+
+ /**
+ * Iterators over the entire contents of the sorted file.
+ *
+ * @return the sorted iterator
+ * @throws IOException scan error
+ */
+ SortedIterator<V> scan() throws IOException;
+
+ /**
+ * Scans the available keys and allows iteration over the interval [from, to)
+ * where the starting key is included and the ending key is excluded from
+ * the results.
+ *
+ * @param from the start key
+ * @param to the end key
+ * @return the sorted iterator
+ * @throws IOException scan error
+ */
+ SortedIterator<V> scan(byte[] from, byte[] to) throws IOException;
+
+ /**
+ * Scans the keys and returns an iterator over the interval [equalTo, equalTo].
+ *
+ * @param equalTo the key to match
+ * @return the sorted iterator
+ * @throws IOException scan error
+ */
+ SortedIterator<V> scan(byte[] equalTo) throws IOException;
+
+ /**
+ * Scans the keys and allows iteration between the given keys.
+ *
+ * @param from the start key
+ * @param fromInclusive true if the start key is included in the scan
+ * @param to the end key
+ * @param toInclusive true if the end key is included in the scan
+ * @return the sorted iterator
+ * @throws IOException scan error
+ */
+ SortedIterator<V> scan(byte[] from, boolean fromInclusive,
+ byte[] to, boolean toInclusive) throws IOException;
+
+ /**
+ * Scans the keys and allows iteration between the given keys after applying
+ * the metdata filter and the order flag. These parameters override values
+ * configured using <code>withAscending</code> or <code>withFilter</code>.
+ *
+ * @param from the start key
+ * @param fromInclusive true if the start key is included in the scan
+ * @param to the end key
+ * @param toInclusive true if the end key is included in the scan
+ * @param ascending true if ascending
+ * @param filter filters data based on metadata values
+ * @return the sorted iterator
+ * @throws IOException scan error
+ */
+ SortedIterator<V> scan(
+ byte[] from, boolean fromInclusive,
+ byte[] to, boolean toInclusive,
+ boolean ascending,
+ MetadataFilter filter) throws IOException;
+
+ /**
+ * Changes the iteration order of subsequent operations.
+ *
+ * @param ascending true if ascending order (default)
+ * @return the reader
+ */
+ SortedReader<V> withAscending(boolean ascending);
+
+ /**
+ * Applies a metadata filter to subsequent operations.
+ *
+ * @param filter the filter to apply
+ * @return the reader
+ */
+ SortedReader<V> withFilter(MetadataFilter filter);
+
+ /**
+ * Returns the comparator used for sorting keys.
+ * @return the comparator
+ */
+ SerializedComparator getComparator();
+
+ /**
+ * Returns the statistics regarding the keys present in the sorted file.
+ * @return the statistics
+ * @throws IOException unable retrieve statistics
+ */
+ SortedStatistics getStatistics() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java
new file mode 100644
index 0000000..2934f07
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tracks the usage of a reference.
+ *
+ *
+ * @param <T> the reference type
+ */
+public final class TrackedReference<T> {
+ /** the referent */
+ private final T ref;
+
+ /** the number of uses */
+ private final AtomicInteger uses;
+
+ /** list of users using this reference. Mainly for debugging */
+ final ConcurrentHashMap<String, AtomicInteger> users;
+
+ /**
+ * Decrements the use count of each reference.
+ * @param refs the references to decrement
+ */
+ public static <T> void decrementAll(Iterable<TrackedReference<T>> refs) {
+ for (TrackedReference<?> tr : refs) {
+ tr.decrement();
+ }
+ }
+
+ public TrackedReference(T ref) {
+ this.ref = ref;
+ uses = new AtomicInteger(0);
+ users = new ConcurrentHashMap<String, AtomicInteger>();
+ }
+
+ /**
+ * Returns the referent.
+ * @return the referent
+ */
+ public final T get() {
+ return ref;
+ }
+
+ /**
+ * Returns the current count.
+ * @return the current uses
+ */
+ public int uses() {
+ return uses.get();
+ }
+
+ /**
+ * Returns true if the reference is in use.
+ * @return true if used
+ */
+ public boolean inUse() {
+ return uses() > 0;
+ }
+
+ /**
+ * Increments the use count and returns the reference.
+ * @return the reference
+ */
+ public T getAndIncrement() {
+ increment();
+ return ref;
+ }
+
+ /**
+ * Increments the use counter and returns the current count.
+ * @return the current uses
+ */
+ public int increment() {
+ return increment(null);
+ }
+
+ /**
+ * Increments the use counter and returns the current count.
+ * @return the current uses
+ */
+ public int increment(String user) {
+ int val = uses.incrementAndGet();
+ if (user != null) {
+ AtomicInteger counter = users.get(user);
+ if (counter == null) {
+ counter = new AtomicInteger();
+ users.putIfAbsent(user, counter);
+ counter = users.get(user);
+ }
+ counter.incrementAndGet();
+ }
+ assert val >= 1;
+
+ return val;
+ }
+
+ /**
+ * Decrements the use counter and returns the current count.
+ * @return the current uses
+ */
+ public int decrement() {
+ return decrement(null);
+ }
+
+ /**
+ * Decrements the use counter and returns the current count.
+ * @return the current uses
+ */
+ public int decrement(String user) {
+ int val = uses.decrementAndGet();
+ assert val >= 0;
+ if (user != null) {
+ AtomicInteger counter = users.get(user);
+ if (counter != null) {
+ counter.decrementAndGet();
+ }
+ }
+
+ return val;
+ }
+
+ @Override
+ public String toString() {
+ if (users != null) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(ref.toString()).append(": ").append(uses());
+ for (Entry<String, AtomicInteger> user : users.entrySet()) {
+ sb.append(" ").append(user.getKey()).append(":").append(user.getValue().intValue());
+ }
+ return sb.toString();
+ }
+ return uses() + ": " + ref.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
index ca7818a..e6c07d9 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
@@ -1145,7 +1145,7 @@ public abstract class BaseCommand implements Command {
VersionTagHolder versionHolder = new VersionTagHolder();
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
// From Get70.getValueAndIsObject()
- Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true);
+ Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true, false);
VersionTag vt = versionHolder.getVersionTag();
updateValues(values, entryKey, data, vt);
@@ -1252,7 +1252,7 @@ public abstract class BaseCommand implements Command {
}
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
- data = region.get(key, null, true, true, true, id, versionHolder, true);
+ data = region.get(key, null, true, true, true, id, versionHolder, true, false);
versionTag = versionHolder.getVersionTag();
updateValues(values, key, data, versionTag);
@@ -1345,7 +1345,7 @@ public abstract class BaseCommand implements Command {
key = it.next();
versionHolder = new VersionTagHolder();
- Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true);
+ Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true, false);
updateValues(values, key, value, versionHolder.getVersionTag());
@@ -1548,7 +1548,7 @@ public abstract class BaseCommand implements Command {
ClientProxyMembershipID id = servConn == null ? null : servConn
.getProxyID();
data = region.get(key, null, true, true, true, id, versionHolder,
- true);
+ true, false);
versionTag = versionHolder.getVersionTag();
updateValues(values, key, data, versionTag);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
index 55047c7..7898b3c 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
@@ -24,6 +24,7 @@ import com.gemstone.gemfire.cache.client.internal.GetOp;
import com.gemstone.gemfire.cache.operations.GetOperationContext;
import com.gemstone.gemfire.cache.operations.internal.GetOperationContextImpl;
import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.CachedDeserializable;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
@@ -304,7 +305,7 @@ public class Get70 extends BaseCommand {
// } else {
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
VersionTagHolder versionHolder = new VersionTagHolder();
- data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, versionHolder, true);
+ data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, versionHolder, true, true /*allowReadFromHDFS*/);
// }
versionTag = versionHolder.getVersionTag();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
index 69d54a1..2a617a8 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
@@ -242,7 +242,7 @@ public class Request extends BaseCommand {
boolean isObject = true;
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
- Object data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, null, false);
+ Object data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, null, false, true/*allowReadFromHDFS*/);
// If the value in the VM is a CachedDeserializable,
// get its value. If it is Token.REMOVED, Token.DESTROYED,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
index 90522b2..e896649 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java
@@ -67,8 +67,8 @@ public class ClientTXRegionStub implements TXRegionStub {
public Object findObject(KeyInfo keyInfo, boolean isCreate,
- boolean generateCallbacks, Object value, boolean preferCD,
- ClientProxyMembershipID requestingClient, EntryEventImpl event) {
+ boolean generateCallbacks, Object value, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl event, boolean allowReadFromHDFS) {
return proxy.get(keyInfo.getKey(), keyInfo.getCallbackArg(), event);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
index 1637c4a..7c7df53 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java
@@ -17,10 +17,12 @@
package com.gemstone.gemfire.internal.cache.tx;
import java.util.Collections;
+import java.util.Map;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.RemoteTransactionException;
import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
import com.gemstone.gemfire.cache.TransactionException;
@@ -30,6 +32,7 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation;
import com.gemstone.gemfire.internal.cache.DistributedRemoveAllOperation;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.KeyInfo;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionException;
@@ -51,6 +54,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.RemoteSizeMessage;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantReadWriteLock;
public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
@@ -155,13 +159,9 @@ public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
}
- public Object findObject(KeyInfo keyInfo,
- boolean isCreate,
- boolean generateCallbacks,
- Object value,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent) {
+ public Object findObject(KeyInfo keyInfo, boolean isCreate,
+ boolean generateCallbacks, Object value, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean allowReadFromHDFS) {
Object retVal = null;
final Object key = keyInfo.getKey();
final Object callbackArgument = keyInfo.getCallbackArg();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
index 01b1ed8..6723646 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java
@@ -275,15 +275,15 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
public Object findObject(KeyInfo keyInfo, boolean isCreate,
- boolean generateCallbacks, Object value, boolean peferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent) {
+ boolean generateCallbacks, Object value, boolean peferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean allowReadFromHDFS) {
Object retVal = null;
final Object key = keyInfo.getKey();
final Object callbackArgument = keyInfo.getCallbackArg();
PartitionedRegion pr = (PartitionedRegion)region;
try {
- retVal = pr.getRemotely((InternalDistributedMember)state.getTarget(), keyInfo.getBucketId(), key, callbackArgument, peferCD, requestingClient, clientEvent, false);
+ retVal = pr.getRemotely((InternalDistributedMember)state.getTarget(), keyInfo.getBucketId(), key, callbackArgument, peferCD, requestingClient, clientEvent, false, allowReadFromHDFS);
} catch (TransactionException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e.getCause());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
index f2859f1..482882f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java
@@ -42,8 +42,8 @@ public interface TXRegionStub {
boolean containsValueForKey(KeyInfo keyInfo);
Object findObject(KeyInfo keyInfo, boolean isCreate,
- boolean generateCallbacks, Object value, boolean preferCD,
- ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent);
+ boolean generateCallbacks, Object value, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean allowReadFromHDFS);
Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstone);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index fe09d03..94524bd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -157,6 +157,8 @@ public abstract class AbstractGatewaySender implements GatewaySender,
protected boolean isBucketSorted;
+ protected boolean isHDFSQueue;
+
protected boolean isMetaQueue;
private int parallelismForReplicatedRegion;
@@ -258,6 +260,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
this.maxMemoryPerDispatcherQueue = this.queueMemory / this.dispatcherThreads;
this.myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager().getDistributedSystemId();
this.serialNumber = DistributionAdvisor.createSerialNumber();
+ this.isHDFSQueue = attrs.isHDFSQueue();
this.isMetaQueue = attrs.isMetaQueue();
if (!(this.cache instanceof CacheCreation)) {
this.stopper = new Stopper(cache.getCancelCriterion());
@@ -266,7 +269,8 @@ public abstract class AbstractGatewaySender implements GatewaySender,
this.statistics = new GatewaySenderStats(cache.getDistributedSystem(),
id);
}
- initializeEventIdIndex();
+ if (!attrs.isHDFSQueue())
+ initializeEventIdIndex();
}
this.isBucketSorted = attrs.isBucketSorted();
}
@@ -314,10 +318,12 @@ public abstract class AbstractGatewaySender implements GatewaySender,
cache.getDistributedSystem(), AsyncEventQueueImpl
.getAsyncEventQueueIdFromSenderId(id));
}
- initializeEventIdIndex();
+ if (!attrs.isHDFSQueue())
+ initializeEventIdIndex();
}
this.isBucketSorted = attrs.isBucketSorted();
-
+ this.isHDFSQueue = attrs.isHDFSQueue();
+
}
public GatewaySenderAdvisor getSenderAdvisor() {
@@ -476,6 +482,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
return this.isBucketSorted;
}
+ public boolean getIsHDFSQueue() {
+ return this.isHDFSQueue;
+ }
+
public boolean getIsMetaQueue() {
return this.isMetaQueue;
}
@@ -853,6 +863,12 @@ public abstract class AbstractGatewaySender implements GatewaySender,
return;
}
+ if (getIsHDFSQueue() && event.getOperation().isEviction()) {
+ if (logger.isDebugEnabled())
+ logger.debug("Eviction event not queued: " + event);
+ stats.incEventsNotQueued();
+ return;
+ }
// this filter is defined by Asif which exist in old wan too. new wan has
// other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is
// not cinsidering this filter
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
index 1cef940..025616d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
@@ -30,6 +30,7 @@ import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
public class GatewaySenderAttributes {
public static final boolean DEFAULT_IS_BUCKETSORTED = true;
+ public static final boolean DEFAULT_IS_HDFSQUEUE = false;
public static final boolean DEFAULT_IS_META_QUEUE = false;
@@ -81,6 +82,7 @@ public class GatewaySenderAttributes {
public boolean isBucketSorted = GatewaySenderAttributes.DEFAULT_IS_BUCKETSORTED;
+ public boolean isHDFSQueue = GatewaySenderAttributes.DEFAULT_IS_HDFSQUEUE;
public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE;
public int getSocketBufferSize() {
@@ -189,6 +191,9 @@ public class GatewaySenderAttributes {
public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() {
return this.eventSubstitutionFilter;
}
+ public boolean isHDFSQueue() {
+ return this.isHDFSQueue;
+ }
public boolean isMetaQueue() {
return this.isMetaQueue;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 07a3be5..b63c7cb 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -36,6 +36,9 @@ import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index f995ba4..8524ccf 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -22,6 +22,8 @@ package com.gemstone.gemfire.internal.cache.wan.parallel;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
@@ -186,6 +188,11 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
getPGSProcessor( bucketId).notifyEventProcessorIfRequired(bucketId);
}
+ public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
+ int bucketId) throws ForceReattemptException {
+ return getPGSProcessor(bucketId).getBucketRegionQueue(region, bucketId);
+ }
+
public void clear(PartitionedRegion pr, int bucketId) {
getPGSProcessor(bucketId).clear(pr, bucketId);
}
@@ -200,6 +207,11 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
getPGSProcessor(bucketId).conflateEvent(conflatableObject, bucketId, tailKey);
}
+ public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey,
+ int bucketId) throws ForceReattemptException {
+ return getPGSProcessor(bucketId).get(region, regionKey, bucketId);
+ }
+
public void addShadowPartitionedRegionForUserRR(DistributedRegion userRegion) {
for(int i =0; i< processors.length; i++){
processors[i].addShadowPartitionedRegionForUserRR(userRegion);;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 11502af..417ba13 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -28,6 +28,9 @@ import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.DistributedRegion;
@@ -101,7 +104,10 @@ public class ParallelGatewaySenderEventProcessor extends
}
ParallelGatewaySenderQueue queue;
- queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
+ if (sender.getIsHDFSQueue())
+ queue = new HDFSParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
+ else
+ queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
queue.start();
this.queue = queue;
@@ -139,8 +145,12 @@ public class ParallelGatewaySenderEventProcessor extends
// while merging 42004, kept substituteValue as it is(it is barry's
// change 42466). bucketID is merged with eventID.getBucketID
+ if (!sender.getIsHDFSQueue())
gatewayQueueEvent = new GatewaySenderEventImpl(operation, event,
substituteValue, true, eventID.getBucketID());
+ else
+ gatewayQueueEvent = new HDFSGatewayEventImpl(operation,
+ event, substituteValue, true, eventID.getBucketID());
if (getSender().beforeEnqueue(gatewayQueueEvent)) {
long start = getSender().getStatistics().startTime();
@@ -198,6 +208,16 @@ public class ParallelGatewaySenderEventProcessor extends
((ParallelGatewaySenderQueue)this.queue).conflateEvent(conflatableObject, bucketId, tailKey);
}
+ public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey,
+ int bucketId) throws ForceReattemptException {
+ return ((HDFSParallelGatewaySenderQueue)this.queue).get(region, regionKey, bucketId);
+ }
+
+ public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
+ int bucketId) throws ForceReattemptException {
+ return ((HDFSParallelGatewaySenderQueue)this.queue).getBucketRegionQueue(region, bucketId);
+ }
+
public void addShadowPartitionedRegionForUserPR(PartitionedRegion pr) {
// TODO Auto-generated method stub
((ParallelGatewaySenderQueue)this.queue).addShadowPartitionedRegionForUserPR(pr);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 46ff263..b0b1a32 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -492,7 +492,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
if (this.userRegionNameToshadowPRMap.containsKey(regionName))
return;
- if(userPR.getDataPolicy().withPersistence() && !sender.isPersistenceEnabled()){
+ if(!isUsedForHDFS() && userPR.getDataPolicy().withPersistence() && !sender.isPersistenceEnabled()){
throw new GatewaySenderException(
LocalizedStrings.ParallelGatewaySenderQueue_NON_PERSISTENT_GATEWAY_SENDER_0_CAN_NOT_BE_ATTACHED_TO_PERSISTENT_REGION_1
.toLocalizedString(new Object[] { this.sender.getId(),
@@ -552,7 +552,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
ParallelGatewaySenderQueueMetaRegion meta = metaRegionFactory.newMetataRegion(cache,
- prQName, ra, sender);
+ prQName, ra, sender, isUsedForHDFS());
try {
prQ = (PartitionedRegion)cache
@@ -630,6 +630,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
bucketRegion.clear();
}
}
+ protected boolean isUsedForHDFS()
+ {
+ return false;
+ }
protected void afterRegionAdd (PartitionedRegion userPR) {
}
@@ -1853,12 +1857,18 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
public ParallelGatewaySenderQueueMetaRegion(String regionName,
RegionAttributes attrs, LocalRegion parentRegion,
GemFireCacheImpl cache, AbstractGatewaySender pgSender) {
+ this( regionName, attrs, parentRegion, cache, pgSender, false);
+ }
+ public ParallelGatewaySenderQueueMetaRegion(String regionName,
+ RegionAttributes attrs, LocalRegion parentRegion,
+ GemFireCacheImpl cache, AbstractGatewaySender pgSender, boolean isUsedForHDFS) {
super(regionName, attrs, parentRegion, cache,
new InternalRegionArguments().setDestroyLockFlag(true)
.setRecreateFlag(false).setSnapshotInputStream(null)
.setImageTarget(null)
.setIsUsedForParallelGatewaySenderQueue(true)
- .setParallelGatewaySender((AbstractGatewaySender)pgSender));
+ .setParallelGatewaySender((AbstractGatewaySender)pgSender)
+ .setIsUsedForHDFSParallelGatewaySenderQueue(isUsedForHDFS));
this.sender = (AbstractGatewaySender)pgSender;
}
@@ -1915,9 +1925,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
static class MetaRegionFactory {
ParallelGatewaySenderQueueMetaRegion newMetataRegion(
- GemFireCacheImpl cache, final String prQName, final RegionAttributes ra, AbstractGatewaySender sender) {
+ GemFireCacheImpl cache, final String prQName, final RegionAttributes ra, AbstractGatewaySender sender, boolean isUsedForHDFS) {
ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion(
- prQName, ra, null, cache, sender);
+ prQName, ra, null, cache, sender, isUsedForHDFS);
return meta;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
index 0015665..77f9596 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java
@@ -41,6 +41,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
private int maxQueueMemory = 0;
private boolean isParallel = false;
private boolean isBucketSorted = false;
+ private boolean isHDFSQueue = false;
private int dispatcherThreads = 1;
private OrderPolicy orderPolicy = OrderPolicy.KEY;
@@ -61,6 +62,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
this.orderPolicy = senderAttrs.policy;
this.asyncEventListener = eventListener;
this.isBucketSorted = senderAttrs.isBucketSorted;
+ this.isHDFSQueue = senderAttrs.isHDFSQueue;
this.gatewayEventSubstitutionFilter = senderAttrs.eventSubstitutionFilter;
}
@@ -211,4 +213,11 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
public void setBucketSorted(boolean isBucketSorted) {
this.isBucketSorted = isBucketSorted;
}
+ public boolean isHDFSQueue() {
+ return this.isHDFSQueue;
+ }
+
+ public void setIsHDFSQueue(boolean isHDFSQueue) {
+ this.isHDFSQueue = isHDFSQueue;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
index d52d05e..019079d 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
@@ -91,6 +91,12 @@ import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.internal.cache.CacheServerImpl;
import com.gemstone.gemfire.internal.cache.CacheConfig;
import com.gemstone.gemfire.internal.cache.CacheServerLauncher;
@@ -193,7 +199,8 @@ public class CacheCreation implements InternalCache {
* This is important for unit testing 44914.
*/
protected final Map diskStores = new LinkedHashMap();
-
+ protected final Map hdfsStores = new LinkedHashMap();
+
private final List<File> backups = new ArrayList<File>();
private CacheConfig cacheConfig = new CacheConfig();
@@ -507,6 +514,13 @@ public class CacheCreation implements InternalCache {
}
}
+ for(Iterator iter = this.hdfsStores.entrySet().iterator(); iter.hasNext(); ) {
+ Entry entry = (Entry) iter.next();
+ HDFSStoreCreation hdfsStoreCreation = (HDFSStoreCreation) entry.getValue();
+ HDFSStoreFactory storefactory = cache.createHDFSStoreFactory(hdfsStoreCreation);
+ storefactory.create((String) entry.getKey());
+ }
+
cache.initializePdxRegistry();
@@ -517,6 +531,19 @@ public class CacheCreation implements InternalCache {
(RegionAttributesCreation) getRegionAttributes(id);
creation.inheritAttributes(cache, false);
+ // TODO: HDFS: HDFS store/queue will be mapped against region path and not
+ // the attribute id; don't really understand what this is trying to do
+ if (creation.getHDFSStoreName() != null)
+ {
+ HDFSStoreImpl store = cache.findHDFSStore(creation.getHDFSStoreName());
+ if(store == null) {
+ HDFSIntegrationUtil.createDefaultAsyncQueueForHDFS((Cache)cache, creation.getHDFSWriteOnly(), id);
+ }
+ }
+ if (creation.getHDFSStoreName() != null && creation.getPartitionAttributes().getColocatedWith() == null) {
+ creation.addAsyncEventQueueId(HDFSStoreFactoryImpl.getEventQueueName(id));
+ }
+
RegionAttributes attrs;
// Don't let the RegionAttributesCreation escape to the user
AttributesFactory factory = new AttributesFactory(creation);
@@ -1395,6 +1422,27 @@ public class CacheCreation implements InternalCache {
}
@Override
+ public HDFSStoreFactory createHDFSStoreFactory() {
+ // TODO Auto-generated method stub
+ return new HDFSStoreFactoryImpl(this);
+ }
+ @Override
+ public HDFSStore findHDFSStore(String storeName) {
+ return (HDFSStore)this.hdfsStores.get(storeName);
+ }
+
+ @Override
+ public Collection<HDFSStoreImpl> getHDFSStores() {
+ return this.hdfsStores.values();
+ }
+
+ public void addHDFSStore(String name, HDFSStoreCreation hs) {
+ this.hdfsStores.put(name, hs);
+ }
+
+
+
+ @Override
public DistributedMember getMyId() {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
index aa7d49a..c6b0509 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java
@@ -487,6 +487,8 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler {
protected static final String PERSISTENT_REPLICATE_DP = "persistent-replicate";
protected static final String PARTITION_DP = "partition";
protected static final String PERSISTENT_PARTITION_DP = "persistent-partition";
+ protected static final String HDFS_PARTITION_DP = "hdfs-partition";
+ protected static final String HDFS_PERSISTENT_PARTITION_DP = "hdfs-persistent-partition";
/** The name of the <code>keep-alive-timeout</code> attribute */
protected static final String KEEP_ALIVE_TIMEOUT = "keep-alive-timeout";
@@ -763,6 +765,35 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler {
public static final String ASYNC_EVENT_QUEUE = "async-event-queue";
protected static final String ASYNC_EVENT_QUEUE_IDS = "async-event-queue-ids";
+ protected static final String HDFS_EVENT_QUEUE = "hdfs-event-queue";
+ protected static final String HDFS_STORE_NAME = "hdfs-store-name";
+ public static final String HDFS_STORE = "hdfs-store";
+ protected static final String HDFS_HOME_DIR = "home-dir";
+ protected static final String HDFS_READ_CACHE_SIZE = "read-cache-size";
+ protected static final String HDFS_MAX_MEMORY = "max-memory";
+ protected static final String HDFS_BATCH_SIZE = "batch-size";
+ protected static final String HDFS_BATCH_INTERVAL = "batch-interval";
+ protected static final String HDFS_DISPATCHER_THREADS = "dispatcher-threads";
+ protected static final String HDFS_BUFFER_PERSISTENT = "buffer-persistent";
+ protected static final String HDFS_SYNCHRONOUS_DISK_WRITE = "synchronous-disk-write";
+ protected static final String HDFS_DISK_STORE = "disk-store";
+ protected static final String HDFS_MAX_WRITE_ONLY_FILE_SIZE = "max-write-only-file-size";
+ public static final String HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL = "write-only-file-rollover-interval";
+
+ protected static final String HDFS_NAMENODE_URL = "namenode-url";
+ protected static final String HDFS_CLIENT_CONFIG_FILE = "hdfs-client-config-file";
+ public static final String HDFS_PURGE_INTERVAL = "purge-interval";
+ public static final String HDFS_MAJOR_COMPACTION = "major-compaction";
+ public static final String HDFS_MAJOR_COMPACTION_INTERVAL = "major-compaction-interval";
+ public static final String HDFS_MAJOR_COMPACTION_THREADS = "major-compaction-threads";
+ public static final String HDFS_MINOR_COMPACTION = "minor-compaction";
+ public static final String HDFS_MINOR_COMPACTION_THREADS = "minor-compaction-threads";
+
+ public static final String HDFS_TIME_FOR_FILE_ROLLOVER = "file-rollover-time-secs";
+
+ protected static final String HDFS_WRITE_ONLY = "hdfs-write-only";
+ protected static final String HDFS_QUEUE_BATCH_SIZE = "batch-size-mb";
+
/** The name of the <code>compressor</code> attribute */
protected static final String COMPRESSOR = "compressor";
/** The name of the <code>off-heap</code> attribute