You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ma...@apache.org on 2015/11/23 21:48:53 UTC
[23/50] [abbrv] incubator-geode git commit: GEODE-544: Removes soplog
code and tests
GEODE-544: Removes soplog code and tests
The "soplog" code was a partial implementation of a concurrent
LSM tree stored on local disk. This is not currently used anywhere
so is being cleaned up. The interfaces used by the HDFS feature
have not been deleted.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9438c8b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9438c8b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9438c8b1
Branch: refs/heads/feature/GEODE-53
Commit: 9438c8b18dcbc1e903780efdeff5928be175c8b1
Parents: 781bd8d
Author: Anthony Baker <ab...@pivotal.io>
Authored: Thu Nov 12 14:39:48 2015 -0800
Committer: Anthony Baker <ab...@pivotal.io>
Committed: Thu Nov 12 21:22:48 2015 -0800
----------------------------------------------------------------------
.../persistence/soplog/AbstractCompactor.java | 533 -------------
.../soplog/AbstractKeyValueIterator.java | 76 --
.../soplog/AbstractSortedReader.java | 135 ----
.../soplog/ArraySerializedComparator.java | 144 ----
.../cache/persistence/soplog/Compactor.java | 174 -----
.../soplog/CompositeSerializedComparator.java | 57 --
.../soplog/IndexSerializedComparator.java | 127 ---
.../cache/persistence/soplog/LevelTracker.java | 120 ---
.../soplog/LexicographicalComparator.java | 460 -----------
.../cache/persistence/soplog/NonCompactor.java | 110 ---
.../soplog/ReversingSerializedComparator.java | 67 --
.../persistence/soplog/SizeTieredCompactor.java | 198 -----
.../cache/persistence/soplog/SoplogToken.java | 116 ---
.../cache/persistence/soplog/SortedBuffer.java | 367 ---------
.../cache/persistence/soplog/SortedOplog.java | 158 ----
.../persistence/soplog/SortedOplogFactory.java | 278 -------
.../persistence/soplog/SortedOplogSet.java | 118 ---
.../persistence/soplog/SortedOplogSetImpl.java | 780 -------------------
.../soplog/hfile/BlockCacheHolder.java | 39 -
.../soplog/hfile/HFileSortedOplog.java | 694 -----------------
.../soplog/hfile/HFileSortedOplogFactory.java | 80 --
.../soplog/nofile/NoFileSortedOplog.java | 244 ------
.../soplog/nofile/NoFileSortedOplogFactory.java | 41 -
.../cache/persistence/soplog/AppendLog.java | 65 --
.../ArraySerializedComparatorJUnitTest.java | 95 ---
.../CompactionSortedOplogSetTestCase.java | 134 ----
.../persistence/soplog/CompactionTestCase.java | 206 -----
.../persistence/soplog/ComparisonTestCase.java | 77 --
.../soplog/IndexComparatorJUnitTest.java | 79 --
.../LexicographicalComparatorJUnitTest.java | 204 -----
.../soplog/RecoverableSortedOplogSet.java | 221 ------
.../soplog/SizeTieredCompactorJUnitTest.java | 110 ---
.../SizeTieredSortedOplogSetJUnitTest.java | 43 -
.../soplog/SortedBufferJUnitTest.java | 39 -
.../soplog/SortedOplogSetJUnitTest.java | 273 -------
.../soplog/SortedReaderTestCase.java | 295 -------
.../nofile/NoFileSortedOplogJUnitTest.java | 48 --
37 files changed, 7005 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java
deleted file mode 100644
index 0b62313..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java
+++ /dev/null
@@ -1,533 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.EnumMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogWriter;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogSetImpl.MergedIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.util.AbortableTaskService;
-import com.gemstone.gemfire.internal.util.AbortableTaskService.AbortableTask;
-
-public abstract class AbstractCompactor<T extends Comparable<T>> implements Compactor {
- protected static final Logger logger = LogService.getLogger();
-
- /** the soplog factory */
- protected final SortedOplogFactory factory;
-
- /** the fileset */
- protected final Fileset<T> fileset;
-
- /** the soplog tracker */
- protected final CompactionTracker<T> tracker;
-
- /** thread for background compaction */
- protected final AbortableTaskService compactor;
-
- /** inactive files waiting to be deleted */
- private final Queue<TrackedReference<SortedOplogReader>> inactive;
-
- /** the soplogs */
- protected final List<Level> levels;
-
- /** provides consistent view of all levels */
- private final ReadWriteLock levelLock;
-
- /** test flag to abort compaction */
- volatile boolean testAbortDuringCompaction;
-
- /** test flag to delay compaction */
- volatile CountDownLatch testDelayDuringCompaction;
-
- protected final String logPrefix;
-
- public AbstractCompactor(SortedOplogFactory factory,
- Fileset<T> fileset, CompactionTracker<T> tracker,
- Executor exec) {
- assert factory != null;
- assert fileset != null;
- assert tracker != null;
- assert exec != null;
-
- this.factory = factory;
- this.fileset = fileset;
- this.tracker = tracker;
-
- compactor = new AbortableTaskService(exec);
- inactive = new ConcurrentLinkedQueue<TrackedReference<SortedOplogReader>>();
-
- levelLock = new ReentrantReadWriteLock();
- levels = new ArrayList<Level>();
-
- this.logPrefix = "<" + factory.getConfiguration().getName() + "> ";
- }
-
- @Override
- public final void add(SortedOplog soplog) throws IOException {
- levels.get(0).add(soplog);
- }
-
- @Override
- public final boolean compact() throws IOException {
- final CountDownLatch done = new CountDownLatch(1);
- final AtomicReference<Object> result = new AtomicReference<Object>(null);
-
- compact(true, new CompactionHandler() {
- @Override
- public void complete(boolean compacted) {
- result.set(compacted);
- done.countDown();
- }
-
- @Override
- public void failed(Throwable ex) {
- result.set(ex);
- done.countDown();
- }
- });
-
- try {
- done.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException();
- }
-
- Object val = result.get();
- if (val instanceof Throwable) {
- throw new IOException((Throwable) val);
- }
-
- assert val != null;
- return (Boolean) val;
- }
-
- @Override
- public final void compact(final boolean force, final CompactionHandler ch) {
- // TODO implement force=true, results in a single soplog
- AbortableTask task = new AbortableTask() {
- @Override
- public void runOrAbort(AtomicBoolean aborted) {
- final boolean isDebugEnabled = logger.isDebugEnabled();
- if (isDebugEnabled) {
- logger.debug("{}Beginning compaction", AbstractCompactor.this.logPrefix);
- }
-
- // TODO could do this in one go instead of level-by-level
- try {
- boolean compacted = false;
- for (Level level : levels) {
- if (aborted.get()) {
- if (isDebugEnabled) {
- logger.debug("{}Aborting compaction", AbstractCompactor.this.logPrefix);
- }
- break;
- }
-
- checkTestDelay();
- if (force || level.needsCompaction()) {
- if (isDebugEnabled) {
- logger.debug("{}Compacting level {}", AbstractCompactor.this.logPrefix, level);
- }
-
- long start = factory.getConfiguration().getStatistics().getMinorCompaction().begin();
- try {
- compacted |= level.compact(aborted);
- factory.getConfiguration().getStatistics().getMinorCompaction().end(start);
-
- } catch (IOException e) {
- factory.getConfiguration().getStatistics().getMinorCompaction().error(start);
- }
- }
- }
-
- cleanupInactive();
- if (ch != null) {
- if (isDebugEnabled) {
- logger.debug("{}Completed compaction", AbstractCompactor.this.logPrefix);
- }
- ch.complete(compacted);
- }
- } catch (Exception e) {
- if (isDebugEnabled) {
- logger.debug("{}Encountered an error during compaction", AbstractCompactor.this.logPrefix, e);
- }
- if (ch != null) {
- ch.failed(e);
- }
- }
- }
-
- @Override
- public void abortBeforeRun() {
- if (ch != null) {
- ch.complete(false);
- }
- }
- };
- compactor.execute(task);
- }
-
- @Override
- public final CompactionTracker<?> getTracker() {
- return tracker;
- }
-
- @Override
- public final Fileset<?> getFileset() {
- return fileset;
- }
-
- @Override
- public final Collection<TrackedReference<SortedOplogReader>> getActiveReaders(
- byte[] start, byte[] end) {
-
- // need to coordinate with clear() so we can get a consistent snapshot
- // across levels
- levelLock.readLock().lock();
- try {
- // TODO this seems very garbage-y
- List<TrackedReference<SortedOplogReader>> soplogs = new ArrayList<TrackedReference<SortedOplogReader>>();
- for (Level level : levels) {
- soplogs.addAll(level.getSnapshot(start, end));
- }
- return soplogs;
- } finally {
- levelLock.readLock().unlock();
- }
- }
-
- @Override
- public final void clear() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Clearing compactor", this.logPrefix);
- }
-
- compactor.abortAll();
- releaseTestDelay();
- compactor.waitForCompletion();
-
- levelLock.writeLock().lock();
- try {
- for (Level l : levels) {
- l.clear();
- }
- } finally {
- levelLock.writeLock().unlock();
- }
-
- cleanupInactive();
- }
-
- @Override
- public final void close() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Closing compactor", this.logPrefix);
- }
-
- compactor.abortAll();
- releaseTestDelay();
- compactor.waitForCompletion();
-
- levelLock.writeLock().lock();
- try {
- for (Level l : levels) {
- l.close();
- }
- } finally {
- levelLock.writeLock().unlock();
- }
-
- TrackedReference<SortedOplogReader> tr;
- while ((tr = inactive.poll()) != null) {
- deleteInactive(tr);
- }
- inactive.clear();
- }
-
- /**
- * Creates a new soplog by merging the supplied soplog readers.
- *
- * @param readers the readers to merge
- * @param collect true if deleted entries should be removed
- * @return the merged soplog
- *
- * @throws IOException error during merge operation
- */
- protected SortedOplog merge(
- Collection<TrackedReference<SortedOplogReader>> readers,
- boolean collect,
- AtomicBoolean aborted) throws IOException {
-
- SerializedComparator sc = null;
- List<SortedIterator<ByteBuffer>> iters = new ArrayList<SortedIterator<ByteBuffer>>();
- for (TrackedReference<SortedOplogReader> tr : readers) {
- iters.add(tr.get().scan());
- sc = tr.get().getComparator();
- }
-
- SortedIterator<ByteBuffer> scan = new MergedIterator(sc, readers, iters);
- try {
- if (!scan.hasNext()) {
- checkAbort(aborted);
- if (logger.isDebugEnabled()) {
- logger.debug("{}No entries left after compaction with readers {} ", this.logPrefix, readers);
- }
- return null;
- }
-
- File f = fileset.getNextFilename();
- if (logger.isDebugEnabled()) {
- logger.debug("{}Compacting soplogs {} into {}", this.logPrefix, readers, f);
- }
-
- if (testAbortDuringCompaction) {
- aborted.set(true);
- }
-
- SortedOplog soplog = factory.createSortedOplog(f);
- SortedOplogWriter wtr = soplog.createWriter();
- try {
- while (scan.hasNext()) {
- checkAbort(aborted);
- scan.next();
- if (!(collect && isDeleted(scan.value()))) {
- wtr.append(scan.key(), scan.value());
- }
- }
-
- EnumMap<Metadata, byte[]> metadata = mergeMetadata(readers);
- wtr.close(metadata);
- return soplog;
-
- } catch (IOException e) {
- wtr.closeAndDelete();
- throw e;
- }
- } finally {
- scan.close();
- }
- }
-
- protected EnumMap<Metadata, byte[]> mergeMetadata(
- Collection<TrackedReference<SortedOplogReader>> readers)
- throws IOException {
- // merge the metadata into the compacted file
- EnumMap<Metadata, byte[]> metadata = new EnumMap<Metadata, byte[]>(Metadata.class);
- for (Metadata meta : Metadata.values()) {
- byte[] val = null;
- for (TrackedReference<SortedOplogReader> tr : readers) {
- byte[] tmp = tr.get().getMetadata(meta);
- if (val == null) {
- val = tmp;
-
- } else if (tmp != null) {
- val = factory.getConfiguration().getMetadataCompactor(meta).compact(val, tmp);
- }
- }
- if (val != null) {
- metadata.put(meta, val);
- }
- }
- return metadata;
- }
-
- protected void releaseTestDelay() {
- if (testDelayDuringCompaction != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Releasing testDelayDuringCompaction", this.logPrefix);
- }
- testDelayDuringCompaction.countDown();
- }
- }
-
- protected void checkTestDelay() {
- if (testDelayDuringCompaction != null) {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Waiting for testDelayDuringCompaction", this.logPrefix);
- }
- testDelayDuringCompaction.await();
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
-
- /**
- * Returns the number of inactive readers.
- * @return the inactive readers
- */
- protected int countInactiveReaders() {
- return inactive.size();
- }
-
- /**
- * Returns the requested level for testing purposes.
- * @param level the level ordinal
- * @return the level
- */
- protected Level getLevel(int level) {
- return levels.get(level);
- }
-
- protected void cleanupInactive() throws IOException {
- for (Iterator<TrackedReference<SortedOplogReader>> iter = inactive.iterator(); iter.hasNext(); ) {
- TrackedReference<SortedOplogReader> tr = iter.next();
- if (!tr.inUse() && inactive.remove(tr)) {
- deleteInactive(tr);
- }
- }
- }
-
- protected void markAsInactive(Iterable<TrackedReference<SortedOplogReader>> snapshot, T attach) throws IOException {
- final boolean isDebugEnabled = logger.isDebugEnabled();
- for (Iterator<TrackedReference<SortedOplogReader>> iter = snapshot.iterator(); iter.hasNext(); ) {
- TrackedReference<SortedOplogReader> tr = iter.next();
- if (isDebugEnabled) {
- logger.debug("{}Marking {} as inactive", this.logPrefix, tr);
- }
-
- inactive.add(tr);
- tracker.fileRemoved(tr.get().getFile(), attach);
-
- factory.getConfiguration().getStatistics().incActiveFiles(-1);
- factory.getConfiguration().getStatistics().incInactiveFiles(1);
- }
- }
-
- private boolean isDeleted(ByteBuffer value) {
- //first byte determines the value type
- byte valType = value.get(value.position());
- return SoplogToken.isTombstone(valType) || SoplogToken.isRemovedPhase2(valType);
- }
-
- private void checkAbort(AtomicBoolean aborted)
- throws InterruptedIOException {
- if (aborted.get()) {
- throw new InterruptedIOException();
- }
- }
-
- private void deleteInactive(TrackedReference<SortedOplogReader> tr)
- throws IOException {
- tr.get().close();
- if (tr.get().getFile().delete()) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Deleted inactive soplog {}", this.logPrefix, tr.get().getFile());
- }
-
- tracker.fileDeleted(tr.get().getFile());
- factory.getConfiguration().getStatistics().incInactiveFiles(-1);
- }
- }
-
- /**
- * Organizes a set of soplogs for a given level.
- */
- protected static abstract class Level {
- /** the level ordinal position */
- protected final int level;
-
- public Level(int level) {
- this.level = level;
- }
-
- @Override
- public String toString() {
- return String.valueOf(level);
- }
-
- /**
- * Returns true if the level needs compaction.
- * @return true if compaction is needed
- */
- protected abstract boolean needsCompaction();
-
- /**
- * Obtains the current set of active soplogs for this level.
- * @return the soplog snapshot
- */
- protected List<TrackedReference<SortedOplogReader>> getSnapshot() {
- return getSnapshot(null, null);
- }
-
- /**
- * Obtains the current set of active soplogs for this level, optionally
- * bounded by the start and end keys.
- *
- * @param start the start key
- * @param end the end key
- * @return the soplog snapshot
- */
- protected abstract List<TrackedReference<SortedOplogReader>> getSnapshot(byte[] start, byte[] end);
-
- /**
- * Clears the soplogs that match the metadata filter.
- * @throws IOException error during close
- */
- protected abstract void clear() throws IOException;
-
- /**
- * Closes the soplogs managed by this level.
- * @throws IOException error closing soplogs
- */
- protected abstract void close() throws IOException;
-
- /**
- * Adds a new soplog to this level.
- *
- * @param soplog the soplog
- * @throws IOException error creating reader
- */
- protected abstract void add(SortedOplog soplog) throws IOException;
-
- /**
- * Merges the current soplogs into a new soplog and promotes it to the next
- * level. The previous soplogs are marked for deletion.
- *
- * @param aborted true if the compaction should be aborted
- * @throws IOException error unable to perform compaction
- */
- protected abstract boolean compact(AtomicBoolean aborted) throws IOException;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java
deleted file mode 100644
index 1326d5c..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-/**
- * Provides an {@link Iterator} view over a collection of keys and values. The
- * implementor must provide access to the current key/value as well as a means
- * to move to the next pair.
- *
- * @author bakera
- *
- * @param <K> the key type
- * @param <V> the value type
- */
-public abstract class AbstractKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
- /** true if the iterator has been advanced to the next element */
- private boolean foundNext = false;
-
- @Override
- public boolean hasNext() {
- if (!foundNext) {
- foundNext = step();
- }
- return foundNext;
- }
-
- @Override
- public K next() {
- if (!foundNext && !step()) {
- throw new NoSuchElementException();
- }
-
- foundNext = false;
- return key();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Returns the key at the current position.
- * @return the key
- */
- public abstract K key();
-
- /**
- * Returns the value at the current position.
- * @return the value
- */
- public abstract V value();
-
- /**
- * Steps the iteration to the next position.
- * @return true if the step succeeded
- */
- protected abstract boolean step();
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java
deleted file mode 100644
index c11e1e0..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-
-/**
- * Provides default behavior for range scans.
- *
- * @author bakera
- */
-public abstract class AbstractSortedReader implements SortedReader<ByteBuffer> {
- @Override
- public final SortedIterator<ByteBuffer> scan() throws IOException {
- return scan(null, true, null, true);
- }
-
- @Override
- public final SortedIterator<ByteBuffer> head(byte[] to, boolean inclusive) throws IOException{
- return scan(null, true, to, inclusive);
- }
-
- @Override
- public final SortedIterator<ByteBuffer> tail(byte[] from, boolean inclusive) throws IOException{
- return scan(from, inclusive, null, true);
- }
-
- @Override
- public final SortedIterator<ByteBuffer> scan(byte[] from, byte[] to) throws IOException{
- return scan(from, true, to, false);
- }
-
- @Override
- public final SortedIterator<ByteBuffer> scan(byte[] equalTo) throws IOException{
- return scan(equalTo, true, equalTo, true);
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(byte[] from, boolean fromInclusive, byte[] to,
- boolean toInclusive) throws IOException{
- return scan(from, fromInclusive, to, toInclusive, true, null);
- }
-
- @Override
- public final SortedReader<ByteBuffer> withAscending(boolean ascending) {
- if (this instanceof DelegateSortedReader) {
- DelegateSortedReader tmp = (DelegateSortedReader) this;
- return new DelegateSortedReader(tmp.delegate, ascending, tmp.filter);
- }
- return new DelegateSortedReader(this, ascending, null);
- }
-
- @Override
- public final SortedReader<ByteBuffer> withFilter(MetadataFilter filter) {
- if (this instanceof DelegateSortedReader) {
- DelegateSortedReader tmp = (DelegateSortedReader) this;
- return new DelegateSortedReader(tmp.delegate, tmp.ascending, filter);
- }
- return new DelegateSortedReader(this, true, filter);
- }
-
- protected class DelegateSortedReader extends AbstractSortedReader {
- /** the embedded reader */
- private final AbstractSortedReader delegate;
-
- /** true if ascending */
- private final boolean ascending;
-
- /** the filter */
- private final MetadataFilter filter;
-
- public DelegateSortedReader(AbstractSortedReader reader, boolean ascending, MetadataFilter filter) {
- this.delegate = reader;
- this.ascending = ascending;
- this.filter = filter;
- }
-
- @Override
- public boolean mightContain(byte[] key) throws IOException {
- return delegate.mightContain(key);
- }
-
- @Override
- public ByteBuffer read(byte[] key) throws IOException {
- return delegate.read(key);
- }
-
- @Override
- public SerializedComparator getComparator() {
- return delegate.getComparator();
- }
-
- @Override
- public SortedStatistics getStatistics() throws IOException {
- return delegate.getStatistics();
- }
-
- @Override
- public void close() throws IOException {
- delegate.close();
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive) throws IOException {
- return scan(from, fromInclusive, to, toInclusive, ascending, filter);
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive,
- boolean ascending,
- MetadataFilter filter) throws IOException {
- return delegate.scan(from, fromInclusive, to, toInclusive, ascending, filter);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java
deleted file mode 100644
index 139b3cb..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.nio.ByteBuffer;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.util.Bytes;
-
-/**
- * Provides comparisons of composite keys by comparing each of the constituent
- * parts of the key in order. A subkey will only be evaluated if the preceeding
- * keys have compared as equal.
- * <p>
- * Prior to use, an instance must be configured with the ordered list of
- * comparators to apply.
- * <p>
- * The keys for an N-composite are stored as follows:
- * <pre>
- * | len[0] | key[0] | len[1] | key[1] | ... | len[N-2] | key[N-2] | key[N-1] |
- * </pre>
- * where the key length is stored as a protobuf varint.
- *
- * @author bakera
- */
-public class ArraySerializedComparator implements CompositeSerializedComparator,
-DelegatingSerializedComparator {
-
- /** the comparators */
- private volatile SerializedComparator[] comparators;
-
- /**
- * Injects the comparators to be used on composite keys. The number and order
- * must match the key.
- *
- * @param comparators the comparators
- */
- public void setComparators(SerializedComparator[] comparators) {
- this.comparators = comparators;
- }
-
- @Override
- public int compare(byte[] o1, byte[] o2) {
- return compare(o1, 0, o1.length, o2, 0, o2.length);
- }
-
- @Override
- public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) {
- SerializedComparator[] sc = comparators;
-
- int off1 = o1;
- int off2 = o2;
- for (int i = 0; i < sc.length - 1; i++) {
- int klen1 = Bytes.getVarInt(b1, off1);
- int klen2 = Bytes.getVarInt(b2, off2);
-
- off1 += Bytes.sizeofVarInt(klen1);
- off2 += Bytes.sizeofVarInt(klen2);
-
- if (!SoplogToken.isWildcard(b1, off1, b2, off2)) {
- int diff = sc[i].compare(b1, off1, klen1, b2, off2, klen2);
- if (diff != 0) {
- return diff;
- }
- }
- off1 += klen1;
- off2 += klen2;
- }
-
- if (!SoplogToken.isWildcard(b1, off1, b2, off2)) {
- l1 -= (off1 - o1);
- l2 -= (off2 - o2);
- return sc[sc.length - 1].compare(b1, off1, l1, b2, off2, l2);
- }
- return 0;
- }
-
- @Override
- public SerializedComparator[] getComparators() {
- return comparators;
- }
-
- @Override
- public byte[] createCompositeKey(byte[] key1, byte[] key2) {
- return createCompositeKey(new byte[][] { key1, key2 });
- }
-
- @Override
- public byte[] createCompositeKey(byte[]... keys) {
- assert comparators.length == keys.length;
-
- int size = 0;
- for (int i = 0; i < keys.length - 1; i++) {
- size += keys[i].length + Bytes.sizeofVarInt(keys[i].length);
- }
- size += keys[keys.length - 1].length;
-
- // TODO do we have to do a copy here or can we delay until the disk write?
- int off = 0;
- byte[] buf = new byte[size];
- for (int i = 0; i < keys.length - 1; i++) {
- off = Bytes.putVarInt(keys[i].length, buf, off);
- System.arraycopy(keys[i], 0, buf, off, keys[i].length);
- off += keys[i].length;
- }
- System.arraycopy(keys[keys.length - 1], 0, buf, off, keys[keys.length - 1].length);
- return buf;
- }
-
- @Override
- public ByteBuffer getKey(ByteBuffer key, int ordinal) {
- assert ordinal < comparators.length;
-
- for (int i = 0; i < comparators.length - 1; i++) {
- int klen = Bytes.getVarInt(key);
- if (i == ordinal) {
- ByteBuffer subkey = (ByteBuffer) key.slice().limit(klen);
- key.rewind();
-
- return subkey;
- }
- key.position(key.position() + klen);
- }
-
- ByteBuffer subkey = key.slice();
- key.rewind();
-
- return subkey;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java
deleted file mode 100644
index c80f118..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.SortedMap;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-
-/**
- * Defines a mechanism to track and compact soplogs.
- *
- * @author bakera
- */
-public interface Compactor {
- /**
- * Compares metadata values.
- */
- public interface MetadataCompactor {
- /**
- * Combines two metadata values into a single value. Used during compaction
- * to merge metadata between soplog files.
- *
- * @param metadata1 the first value
- * @param metadata2 the second value
- * @return the combined metadata
- */
- byte[] compact(byte[] metadata1, byte[] metadata2);
- }
-
- /**
- * Provides notification on the status of a compaction.
- */
- public interface CompactionHandler {
- /**
- * Invoked when a compaction operation has completed successfully.
- * @param compacted true if any files were compacted
- */
- void complete(boolean compacted);
-
- /**
- * Invoked when a compaction operation has failed.
- * @param ex the failure
- */
- void failed(Throwable ex);
- }
-
- /**
- * Provides external configuration of file operations for recovering and
- * new file creation.
- *
- * @param <T> the compaction info
- */
- public interface Fileset<T extends Comparable<T>> {
- /**
- * Returns the set of active soplogs.
- * @return the active files
- */
- SortedMap<T, ? extends Iterable<File>> recover();
-
- /**
- * Returns the pathname for the next soplog.
- * @return the soplog filename
- */
- File getNextFilename();
- }
-
- /**
- * Provides a mechanism to coordinate file changes to the levels managed
- * by the compactor.
- *
- * @param T the attachment type
- */
- public interface CompactionTracker<T extends Comparable<T>> {
- /**
- * Invoked when a new file is added.
- * @param f the file
- * @param attach the attachment
- */
- void fileAdded(File f, T attach);
-
- /**
- * Invoked when a file is removed.
- * @param f the file
- * @param attach the attachment
- */
- void fileRemoved(File f, T attach);
-
- /**
- * Invoked when a file is deleted.
- * @param f the attachment
- */
- void fileDeleted(File f);
- }
-
- /**
- * Synchronously invokes the force compaction operation and waits for completion.
- *
- * @return true if any files were compacted
- * @throws IOException error during compaction
- */
- boolean compact() throws IOException;
-
- /**
- * Requests a compaction operation be performed on the soplogs. This invocation
- * may block if there are too many outstanding write requests.
- *
- * @param force if false, compaction will only be performed if necessary
- * @param ch invoked when the compaction is complete, optionally null
- * @throws IOException error during compaction
- */
- void compact(boolean force, CompactionHandler ch);
-
- /**
- * Returns the active readers for the given key range. The caller is responsible
- * for decrementing the use count of each reader when finished.
- *
- * @param start the start key inclusive, or null for beginning
- * @param end the end key inclusive, or null for last
- * @return the readers
- *
- * @see TrackedReference
- */
- Collection<TrackedReference<SortedOplogReader>> getActiveReaders(
- byte[] start, byte[] end);
-
- /**
- * Adds a new soplog to the active set.
- * @param soplog the soplog
- * @throws IOException unable to add soplog
- */
- void add(SortedOplog soplog) throws IOException;
-
- /**
- * Returns the compaction tracker for coordinating changes to the file set.
- * @return the tracker
- */
- CompactionTracker<?> getTracker();
-
- /**
- * Returns the file manager for managing the soplog files.
- * @return the fileset
- */
- Fileset<?> getFileset();
-
- /**
- * Clears the active files managed by the compactor. Files will be marked as
- * inactive and eventually deleted.
- *
- * @throws IOException unable to clear
- */
- void clear() throws IOException;
- /**
- * Closes the compactor.
- * @throws IOException unable to close
- */
- void close() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java
deleted file mode 100644
index 8d9aae5..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.nio.ByteBuffer;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-
-/**
- * Creates and compares composite keys.
- *
- * @author bakera
- */
-public interface CompositeSerializedComparator extends SerializedComparator {
- /**
- * Constructs a composite key consisting of a primary key and a secondary key.
- *
- * @param key1 the primary key
- * @param key2 the secondary key
- * @return the composite key
- */
- public byte[] createCompositeKey(byte[] key1, byte[] key2);
-
- /**
- * Constructs a composite key by combining the supplied keys. The number of
- * keys and their order must match the comparator set.
- * <p>
- * The <code>WILDCARD_KEY</code> token may be used to match all subkeys in the
- * given ordinal position. This is useful when constructing a search key to
- * retrieve all keys for a given primary key, ignoring the remaining subkeys.
- *
- * @param keys the keys, ordered by sort priority
- * @return the composite key
- */
- public byte[] createCompositeKey(byte[]... keys);
-
- /**
- * Returns subkey for the given ordinal position.
- * @param key the composite key
- * @return the subkey
- */
- public ByteBuffer getKey(ByteBuffer key, int ordinal);
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java
deleted file mode 100644
index 816eea0..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.nio.ByteBuffer;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.util.Bytes;
-
-/**
- * Provides a comparator for composite keys of the form (k0, k1). The primary
- * keys are compared lexicographically while the secondary keys are compared
- * bitwise. The key format includes the primary key length to avoid deserialization
- * the secondary key when reading:
- * <pre>
- * | varint | primary key | secondary key |
- * </pre>
- * The key length is encoded using a protobuf-style varint.
- * <p>
- *
- * @author bakera
- */
-public class IndexSerializedComparator implements CompositeSerializedComparator,
-DelegatingSerializedComparator {
-
- private volatile SerializedComparator primary;
- private volatile SerializedComparator secondary;
-
- public IndexSerializedComparator() {
- primary = new LexicographicalComparator();
- secondary = new ByteComparator();
- }
-
- @Override
- public void setComparators(SerializedComparator[] comparators) {
- assert comparators.length == 2;
-
- primary = comparators[0];
- secondary = comparators[1];
- }
-
- @Override
- public SerializedComparator[] getComparators() {
- return new SerializedComparator[] { primary, secondary };
- }
-
- @Override
- public int compare(byte[] o1, byte[] o2) {
- return compare(o1, 0, o1.length, o2, 0, o2.length);
- }
-
- @Override
- public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) {
- int klen1 = Bytes.getVarInt(b1, o1);
- int klen2 = Bytes.getVarInt(b2, o2);
-
- int off1 = o1 + Bytes.sizeofVarInt(klen1);
- int off2 = o2 + Bytes.sizeofVarInt(klen2);
-
- // skip the comparison operation if there is a SearchToken.WILDCARD
- if (!SoplogToken.isWildcard(b1, off1, b2, off2)) {
- int diff = primary.compare(b1, off1, klen1, b2, off2, klen2);
- if (diff != 0) {
- return diff;
- }
- }
- off1 += klen1;
- off2 += klen2;
-
- if (!SoplogToken.isWildcard(b1, off1, b2, off2)) {
- l1 -= (off1 - o1);
- l2 -= (off2 - o2);
- return secondary.compare(b1, off1, l1, b2, off2, l2);
- }
- return 0;
- }
-
- @Override
- public ByteBuffer getKey(ByteBuffer key, int ordinal) {
- assert ordinal < 2;
-
- ByteBuffer subkey;
- int klen = Bytes.getVarInt(key);
- if (ordinal == 0) {
- subkey = (ByteBuffer) key.slice().limit(klen);
-
- } else {
- subkey = ((ByteBuffer) key.position(key.position() + klen)).slice();
- }
-
- key.rewind();
- return subkey;
- }
-
- @Override
- public byte[] createCompositeKey(byte[] key1, byte[] key2) {
- int vlen = Bytes.sizeofVarInt(key1.length);
- byte[] buf = new byte[vlen + key1.length + key2.length];
-
- Bytes.putVarInt(key1.length, buf, 0);
- System.arraycopy(key1, 0, buf, vlen, key1.length);
- System.arraycopy(key2, 0, buf, vlen + key1.length, key2.length);
-
- return buf;
- }
-
- @Override
- public byte[] createCompositeKey(byte[]... keys) {
- assert keys.length == 2;
-
- return createCompositeKey(keys[0], keys[1]);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java
deleted file mode 100644
index a590283..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.LineNumberReader;
-import java.io.Writer;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.CompactionTracker;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.Fileset;
-
-/**
- * A simple, non-robust file tracker for tracking soplogs by level.
- *
- * @author bakera
- */
-public class LevelTracker implements Fileset<Integer>, CompactionTracker<Integer>, Closeable {
- private final String name;
- private final File manifest;
-
- private final SortedMap<Integer, Set<File>> levels;
- private final AtomicLong file;
-
- public LevelTracker(String name, File manifest) throws IOException {
- this.name = name;
- this.manifest = manifest;
- file = new AtomicLong(0);
-
- levels = new TreeMap<Integer, Set<File>>();
- if (!manifest.exists()) {
- return;
- }
-
- LineNumberReader rdr = new LineNumberReader(new FileReader(manifest));
- try {
- String line;
- while ((line = rdr.readLine()) != null) {
- String[] parts = line.split(",");
- int level = Integer.parseInt(parts[0]);
- File f = new File(parts[1]);
- add(f, level);
- }
- } finally {
- rdr.close();
- }
- }
-
- @Override
- public SortedMap<Integer, ? extends Iterable<File>> recover() {
- return levels;
- }
-
- @Override
- public File getNextFilename() {
- return new File(manifest.getParentFile(), name + "-" + System.currentTimeMillis()
- + "-" + file.getAndIncrement() + ".soplog");
- }
-
- @Override
- public void fileAdded(File f, Integer attach) {
- add(f, attach);
- }
-
- @Override
- public void fileRemoved(File f, Integer attach) {
- levels.get(attach).remove(f);
- }
-
- @Override
- public void fileDeleted(File f) {
- }
-
- @Override
- public void close() throws IOException {
- Writer wtr = new FileWriter(manifest);
- try {
- for (Map.Entry<Integer, Set<File>> entry : levels.entrySet()) {
- for (File f : entry.getValue()) {
- wtr.write(entry.getKey() + "," + f + "\n");
- }
- }
- } finally {
- wtr.flush();
- wtr.close();
- }
- }
-
- private void add(File f, int level) {
- Set<File> files = levels.get(level);
- if (files == null) {
- files = new HashSet<File>();
- levels.put(level, files);
- }
- files.add(f);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java
deleted file mode 100644
index 24fba50..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.internal.DSCODE;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.util.Bytes;
-
-/**
- * Provides type-optimized comparisons for serialized objects. All data is
- * assumed to have been serialized via a call to
- * {@link DataSerializer#writeObject(Object, java.io.DataOutput) }. The following
- * data types have optimized comparisons:
- * <ul>
- * <li>boolean
- * <li>byte
- * <li>short
- * <li>char
- * <li>int
- * <li>long
- * <li>float
- * <li>double
- * <li>String (not {@link DSCODE#HUGE_STRING} or {@link DSCODE#HUGE_STRING_BYTES})
- * </ul>
- * Types that are not listed above fallback to deserialization and comparison
- * via the {@link Comparable} API.
- * <p>
- * Any numeric type may be compared against another numeric type (e.g. double
- * to int).
- * <p>
- * <strong>Any changes to the serialized format may cause version incompatibilities.
- * In addition, the comparison operations will need to be updated.</strong>
- * <p>
- *
- * @author bakera
- */
-public class LexicographicalComparator implements SerializedComparator {
-
- //////////////////////////////////////////////////////////////////////////////
- //
- // constants for any-to-any numeric comparisons
- //
- //////////////////////////////////////////////////////////////////////////////
-
- private static final int BYTE_TO_BYTE = DSCODE.BYTE << 8 | DSCODE.BYTE;
- private static final int BYTE_TO_SHORT = DSCODE.BYTE << 8 | DSCODE.SHORT;
- private static final int BYTE_TO_INT = DSCODE.BYTE << 8 | DSCODE.INTEGER;
- private static final int BYTE_TO_LONG = DSCODE.BYTE << 8 | DSCODE.LONG;
- private static final int BYTE_TO_FLOAT = DSCODE.BYTE << 8 | DSCODE.FLOAT;
- private static final int BYTE_TO_DOUBLE = DSCODE.BYTE << 8 | DSCODE.DOUBLE;
-
- private static final int SHORT_TO_BYTE = DSCODE.SHORT << 8 | DSCODE.BYTE;
- private static final int SHORT_TO_SHORT = DSCODE.SHORT << 8 | DSCODE.SHORT;
- private static final int SHORT_TO_INT = DSCODE.SHORT << 8 | DSCODE.INTEGER;
- private static final int SHORT_TO_LONG = DSCODE.SHORT << 8 | DSCODE.LONG;
- private static final int SHORT_TO_FLOAT = DSCODE.SHORT << 8 | DSCODE.FLOAT;
- private static final int SHORT_TO_DOUBLE = DSCODE.SHORT << 8 | DSCODE.DOUBLE;
-
- private static final int LONG_TO_BYTE = DSCODE.LONG << 8 | DSCODE.BYTE;
- private static final int LONG_TO_SHORT = DSCODE.LONG << 8 | DSCODE.SHORT;
- private static final int LONG_TO_INT = DSCODE.LONG << 8 | DSCODE.INTEGER;
- private static final int LONG_TO_LONG = DSCODE.LONG << 8 | DSCODE.LONG;
- private static final int LONG_TO_FLOAT = DSCODE.LONG << 8 | DSCODE.FLOAT;
- private static final int LONG_TO_DOUBLE = DSCODE.LONG << 8 | DSCODE.DOUBLE;
-
- private static final int INT_TO_BYTE = DSCODE.INTEGER<< 8 | DSCODE.BYTE;
- private static final int INT_TO_SHORT = DSCODE.INTEGER<< 8 | DSCODE.SHORT;
- private static final int INT_TO_INT = DSCODE.INTEGER<< 8 | DSCODE.INTEGER;
- private static final int INT_TO_LONG = DSCODE.INTEGER<< 8 | DSCODE.LONG;
- private static final int INT_TO_FLOAT = DSCODE.INTEGER<< 8 | DSCODE.FLOAT;
- private static final int INT_TO_DOUBLE = DSCODE.INTEGER<< 8 | DSCODE.DOUBLE;
-
- private static final int FLOAT_TO_BYTE = DSCODE.FLOAT << 8 | DSCODE.BYTE;
- private static final int FLOAT_TO_SHORT = DSCODE.FLOAT << 8 | DSCODE.SHORT;
- private static final int FLOAT_TO_INT = DSCODE.FLOAT << 8 | DSCODE.INTEGER;
- private static final int FLOAT_TO_LONG = DSCODE.FLOAT << 8 | DSCODE.LONG;
- private static final int FLOAT_TO_FLOAT = DSCODE.FLOAT << 8 | DSCODE.FLOAT;
- private static final int FLOAT_TO_DOUBLE = DSCODE.FLOAT << 8 | DSCODE.DOUBLE;
-
- private static final int DOUBLE_TO_BYTE = DSCODE.DOUBLE << 8 | DSCODE.BYTE;
- private static final int DOUBLE_TO_SHORT = DSCODE.DOUBLE << 8 | DSCODE.SHORT;
- private static final int DOUBLE_TO_INT = DSCODE.DOUBLE << 8 | DSCODE.INTEGER;
- private static final int DOUBLE_TO_LONG = DSCODE.DOUBLE << 8 | DSCODE.LONG;
- private static final int DOUBLE_TO_FLOAT = DSCODE.DOUBLE << 8 | DSCODE.FLOAT;
- private static final int DOUBLE_TO_DOUBLE = DSCODE.DOUBLE << 8 | DSCODE.DOUBLE;
-
- //////////////////////////////////////////////////////////////////////////////
- //
- // constants for any-to-any string comparisons
- //
- //////////////////////////////////////////////////////////////////////////////
-
- private static final int STRING_TO_STRING = DSCODE.STRING << 8 | DSCODE.STRING;
- private static final int STRING_TO_STRING_BYTES = DSCODE.STRING << 8 | DSCODE.STRING_BYTES;
- private static final int STRING_BYTES_TO_STRING = DSCODE.STRING_BYTES << 8 | DSCODE.STRING;
- private static final int STRING_BYTES_TO_STRING_BYTES = DSCODE.STRING_BYTES << 8 | DSCODE.STRING_BYTES;
-
- @Override
- public int compare(byte[] o1, byte[] o2) {
- return compare(o1, 0, o1.length, o2, 0, o2.length);
- }
-
- @Override
- public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) {
- byte type1 = b1[o1];
- byte type2 = b2[o2];
-
- // optimized comparisons
- if (isString(type1) && isString(type2)) {
- return compareAsString(type1, b1, o1, type2, b2, o2);
-
- } else if (isNumeric(type1) && isNumeric(type2)) {
- return compareAsNumeric(type1, b1, o1, type2, b2, o2);
-
- } else if (type1 == DSCODE.BOOLEAN && type2 == DSCODE.BOOLEAN) {
- return compareAsBoolean(getBoolean(b1, o1), getBoolean(b2, o2));
-
- } else if (type1 == DSCODE.CHARACTER && type2 == DSCODE.CHARACTER) {
- return compareAsChar(getChar(b1, o1), getChar(b2, o2));
-
- } else if (type1 == DSCODE.NULL || type2 == DSCODE.NULL) {
- // null check, assumes NULLs sort last
- return type1 == type2 ? 0 : type1 == DSCODE.NULL ? 1 : -1;
- }
-
- // fallback, will deserialize to Comparable
- return compareAsObject(b1, o1, l1, b2, o2, l2);
- }
-
- private static boolean isNumeric(int type) {
- return type == DSCODE.BYTE
- || type == DSCODE.SHORT
- || type == DSCODE.INTEGER
- || type == DSCODE.LONG
- || type == DSCODE.FLOAT
- || type == DSCODE.DOUBLE;
- }
-
- private static boolean isString(int type) {
- return type == DSCODE.STRING
- || type == DSCODE.STRING_BYTES;
- }
-
- //////////////////////////////////////////////////////////////////////////////
- //
- // type comparisons
- //
- //////////////////////////////////////////////////////////////////////////////
-
- private static int compareAsString(byte type1, byte[] b1, int o1, byte type2, byte[] b2, int o2) {
- // TODO these comparisons do not provide true alphabetical collation
- // support (for example upper case sort before lower case). Need to use a
- // collation key instead of unicode ordinal number comparison
- switch (type1 << 8 | type2) {
- case STRING_TO_STRING:
- return compareAsStringOfUtf(b1, o1, b2, o2);
-
- case STRING_TO_STRING_BYTES:
- return -compareAsStringOfByteToUtf(b2, o2, b1, o1);
-
- case STRING_BYTES_TO_STRING:
- return compareAsStringOfByteToUtf(b1, o1, b2, o2);
-
- case STRING_BYTES_TO_STRING_BYTES:
- return compareAsStringOfByte(b1, o1, b2, o2);
-
- default:
- throw new ClassCastException(String.format("Incomparable types: %d %d", type1, type2));
- }
- }
-
- private static int compareAsNumeric(byte type1, byte[] b1, int o1, byte type2, byte[] b2, int o2) {
- switch (type1 << 8 | type2) {
- case BYTE_TO_BYTE: return compareAsShort (getByte (b1, o1), getByte (b2, o2));
- case BYTE_TO_SHORT: return compareAsShort (getByte (b1, o1), getShort (b2, o2));
- case BYTE_TO_INT: return compareAsInt (getByte (b1, o1), getInt (b2, o2));
- case BYTE_TO_LONG: return compareAsLong (getByte (b1, o1), getLong (b2, o2));
- case BYTE_TO_FLOAT: return compareAsFloat (getByte (b1, o1), getFloat (b2, o2));
- case BYTE_TO_DOUBLE: return compareAsDouble(getByte (b1, o1), getDouble(b2, o2));
-
- case SHORT_TO_BYTE: return compareAsShort (getShort (b1, o1), getByte (b2, o2));
- case SHORT_TO_SHORT: return compareAsShort (getShort (b1, o1), getShort (b2, o2));
- case SHORT_TO_INT: return compareAsInt (getShort (b1, o1), getInt (b2, o2));
- case SHORT_TO_LONG: return compareAsLong (getShort (b1, o1), getLong (b2, o2));
- case SHORT_TO_FLOAT: return compareAsFloat (getShort (b1, o1), getFloat (b2, o2));
- case SHORT_TO_DOUBLE: return compareAsDouble(getShort (b1, o1), getDouble(b2, o2));
-
- case INT_TO_BYTE: return compareAsInt (getInt (b1, o1), getByte (b2, o2));
- case INT_TO_SHORT: return compareAsInt (getInt (b1, o1), getShort (b2, o2));
- case INT_TO_INT: return compareAsInt (getInt (b1, o1), getInt (b2, o2));
- case INT_TO_LONG: return compareAsLong (getInt (b1, o1), getLong (b2, o2));
- case INT_TO_FLOAT: return compareAsFloat (getInt (b1, o1), getFloat (b2, o2));
- case INT_TO_DOUBLE: return compareAsDouble(getInt (b1, o1), getDouble(b2, o2));
-
- case LONG_TO_BYTE: return compareAsLong (getLong (b1, o1), getByte (b2, o2));
- case LONG_TO_SHORT: return compareAsLong (getLong (b1, o1), getShort (b2, o2));
- case LONG_TO_INT: return compareAsLong (getLong (b1, o1), getInt (b2, o2));
- case LONG_TO_LONG: return compareAsLong (getLong (b1, o1), getLong (b2, o2));
- case LONG_TO_FLOAT: return compareAsDouble(getLong (b1, o1), getFloat (b2, o2));
- case LONG_TO_DOUBLE: return compareAsDouble(getLong (b1, o1), getDouble(b2, o2));
-
- case FLOAT_TO_BYTE: return compareAsFloat (getFloat (b1, o1), getByte (b2, o2));
- case FLOAT_TO_SHORT: return compareAsFloat (getFloat (b1, o1), getShort (b2, o2));
- case FLOAT_TO_INT: return compareAsFloat (getFloat (b1, o1), getInt (b2, o2));
- case FLOAT_TO_LONG: return compareAsFloat (getFloat (b1, o1), getLong (b2, o2));
- case FLOAT_TO_FLOAT: return compareAsFloat (getFloat (b1, o1), getFloat (b2, o2));
- case FLOAT_TO_DOUBLE: return compareAsDouble(getFloat (b1, o1), getDouble(b2, o2));
-
- case DOUBLE_TO_BYTE: return compareAsDouble(getDouble(b1, o1), getByte (b2, o2));
- case DOUBLE_TO_SHORT: return compareAsDouble(getDouble(b1, o1), getShort (b2, o2));
- case DOUBLE_TO_INT: return compareAsDouble(getDouble(b1, o1), getInt (b2, o2));
- case DOUBLE_TO_LONG: return compareAsDouble(getDouble(b1, o1), getLong (b2, o2));
- case DOUBLE_TO_FLOAT: return compareAsDouble(getDouble(b1, o1), getFloat (b2, o2));
- case DOUBLE_TO_DOUBLE: return compareAsDouble(getDouble(b1, o1), getDouble(b2, o2));
-
- default:
- throw new ClassCastException(String.format("Incomparable types: %d %d", type1, type2));
- }
- }
-
- private static int compareAsBoolean(boolean b1, boolean b2) {
- return (b1 == b2) ? 0 : (b1 ? 1 : -1);
- }
-
- private static int compareAsShort(short s1, short s2) {
- return s1 - s2;
- }
-
- private static int compareAsChar(char c1, char c2) {
- // TODO non-collating sort
- return c1 - c2;
- }
-
- private static int compareAsInt(long l1, long l2) {
- return (int) (l1 - l2);
- }
-
- private static int compareAsLong(long l1, long l2) {
- return (l1 < l2) ? -1 : ((l1 == l2) ? 0 : 1);
- }
-
- private static int compareAsFloat(float f1, float f2) {
- return Float.compare(f1, f2);
- }
-
- private static int compareAsDouble(double d1, double d2) {
- return Double.compare(d1, d2);
- }
-
- private static int compareAsStringOfByte(byte[] b1, int o1, byte[] b2, int o2) {
- int offset = 3;
- int l1 = Bytes.toUnsignedShort(b1[o1 + 1], b1[o1 + 2]);
- int l2 = Bytes.toUnsignedShort(b2[o1 + 1], b2[o1 + 2]);
-
- assert b1.length >= o1 + offset + l1;
- assert b2.length >= o2 + offset + l2;
-
- int end = o1 + offset + Math.min(l1, l2);
- for (int i = o1 + offset, j = o2 + offset; i < end; i++, j++) {
- int diff = b1[i] - b2[j];
- if (diff != 0) {
- return diff;
- }
- }
- return l1 - l2;
- }
-
- private static int compareAsStringOfUtf(byte[] b1, int o1, byte[] b2, int o2) {
- int offset = 3;
- int l1 = Bytes.toUnsignedShort(b1[o1 + 1], b1[o1 + 2]);
- int l2 = Bytes.toUnsignedShort(b2[o1 + 1], b2[o1 + 2]);
-
- assert b1.length >= o1 + offset + l1;
- assert b2.length >= o2 + offset + l2;
-
- int i = 0;
- int j = 0;
- while (i < l1 && j < l2) {
- final int idx = o1 + offset + i;
- final int ilen = getUtfLength(b1[idx]);
- final char c1 = getUtfChar(b1, idx, ilen);
- i += ilen;
-
- final int jdx = o2 + offset + j;
- final int jlen = getUtfLength(b2[jdx]);
- char c2 = getUtfChar(b2, jdx, jlen);
- j += jlen;
-
- int diff = compareAsChar(c1, c2);
- if (diff != 0) {
- return diff;
- }
- }
- return (l1 - i) - (l2 - j);
- }
-
- private static int compareAsStringOfByteToUtf(byte[] b1, int o1, byte[] b2, int o2) {
- int offset = 3;
- int l1 = Bytes.toUnsignedShort(b1[o1 + 1], b1[o1 + 2]);
- int l2 = Bytes.toUnsignedShort(b2[o1 + 1], b2[o1 + 2]);
-
- assert b1.length >= o1 + offset + l1;
- assert b2.length >= o2 + offset + l2;
-
- int i = 0;
- int j = 0;
- while (i < l1 && j < l2) {
- final int idx = o1 + offset + i;
- final char c1 = (char) b1[idx];
- i++;
-
- final int jdx = o2 + offset + j;
- final int jlen = getUtfLength(b2[jdx]);
- char c2 = getUtfChar(b2, jdx, jlen);
- j += jlen;
-
- int diff = compareAsChar(c1, c2);
- if (diff != 0) {
- return diff;
- }
- }
- return (l1 - i) - (l2 - j);
- }
-
- private static int compareAsObject(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) {
- DataInput in1 = new DataInputStream(new ByteArrayInputStream(b1, o1, l1));
- DataInput in2 = new DataInputStream(new ByteArrayInputStream(b2, o2, l2));
-
- try {
- Comparable<Object> obj1 = DataSerializer.readObject(in1);
- Comparable<Object> obj2 = DataSerializer.readObject(in2);
-
- return obj1.compareTo(obj2);
-
- } catch (Exception e) {
- throw (RuntimeException) new ClassCastException().initCause(e);
- }
- }
-
- //////////////////////////////////////////////////////////////////////////////
- //
- //
- // Get a char from modified UTF8, as defined by DataInput.readUTF().
- //
- //////////////////////////////////////////////////////////////////////////////
-
- private static int getUtfLength(byte b) {
- int c = b & 0xff;
-
- // 0xxxxxxx
- if (c < 0x80) {
- return 1;
-
- // 110xxxxx 10xxxxxx
- } else if (c < 0xe0) {
- return 2;
- }
-
- // 1110xxxx 10xxxxxx 10xxxxxx
- return 3;
- }
-
- private static char getUtfChar(byte[] b, int off, int len) {
- assert b.length >= off + len;
- switch (len) {
- case 1:
- return (char) b[off];
- case 2:
- return getUtf2(b, off);
- case 3:
- default:
- return getUtf3(b, off);
- }
- }
-
- private static char getUtf2(byte[] b, int off) {
- assert b.length >= off + 2;
- assert (b[off] & 0xff) >= 0xc0;
- assert (b[off] & 0xff) < 0xe0;
- assert (b[off + 1] & 0xff) >= 0x80;
-
- return (char) (((b[off] & 0x1f) << 6) | (b[off + 1] & 0x3f));
- }
-
- private static char getUtf3(byte[] b, int off) {
- assert b.length >= off + 3;
- assert (b[off] & 0xff) >= 0xe0;
- assert (b[off + 1] & 0xff) >= 0x80;
- assert (b[off + 2] & 0xff) >= 0x80;
-
- return (char) (((b[off] & 0x0f) << 12) | ((b[off + 1] & 0x3f) << 6) | (b[off + 2] & 0x3f));
- }
-
-
- //////////////////////////////////////////////////////////////////////////////
- //
- // Get a serialized primitive from byte[]; b[0] is the DSCODE.
- //
- //////////////////////////////////////////////////////////////////////////////
-
- private static boolean getBoolean(byte[] b, int off) {
- assert b.length >= off + 2;
- return b[off + 1] != 0;
- }
-
- private static byte getByte(byte[] b, int off) {
- assert b.length >= off + 2;
- return b[off + 1];
- }
-
- private static short getShort(byte[] b, int off) {
- assert b.length >= off + 3;
- return Bytes.toShort(b[off + 1], b[off + 2]);
- }
-
- private static char getChar(byte[] b, int off) {
- assert b.length >= off + 3;
- return Bytes.toChar(b[off + 1], b[off + 2]);
- }
-
- private static int getInt(byte[] b, int off) {
- assert b.length >= off + 5;
- return Bytes.toInt(b[off + 1], b[off + 2], b[off + 3], b[off + 4]);
- }
-
- private static long getLong(byte[] b, int off) {
- assert b.length >= off + 9;
- return Bytes.toLong(b[off + 1], b[off + 2], b[off + 3], b[off + 4],
- b[off + 5], b[off + 6], b[off + 7], b[off + 8]);
- }
-
- private static float getFloat(byte[] b, int off) {
- assert b.length >= off + 5;
- return Float.intBitsToFloat(getInt(b, off));
- }
-
- private static double getDouble(byte[] b, int off) {
- assert b.length >= off + 9;
- return Double.longBitsToDouble(getLong(b, off));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java
deleted file mode 100644
index 697ac18..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-
-/**
- * Provides a compactor that does no compaction, primarily for testing purposes.
- *
- * @author bakera
- */
-public class NonCompactor implements Compactor {
- /** the fileset */
- private final Fileset<Integer> fileset;
-
- /** the current readers */
- private final Deque<TrackedReference<SortedOplogReader>> readers;
-
- public static Fileset<Integer> createFileset(final String name, final File dir) {
- return new Fileset<Integer>() {
- private final AtomicLong file = new AtomicLong(0);
-
- @Override
- public SortedMap<Integer, ? extends Iterable<File>> recover() {
- return new TreeMap<Integer, Iterable<File>>();
- }
-
- @Override
- public File getNextFilename() {
- return new File(dir, name + "-" + System.currentTimeMillis() + "-"
- + file.getAndIncrement() + ".soplog");
- }
- };
- }
- public NonCompactor(String name, File dir) {
- fileset = createFileset(name, dir);
- readers = new ArrayDeque<TrackedReference<SortedOplogReader>>();
- }
-
- @Override
- public boolean compact() throws IOException {
- // liar!
- return true;
- }
-
- @Override
- public void compact(boolean force, CompactionHandler cd) {
- }
-
- @Override
- public synchronized Collection<TrackedReference<SortedOplogReader>> getActiveReaders(
- byte[] start, byte[] end) {
- for (TrackedReference<SortedOplogReader> tr : readers) {
- tr.increment();
- }
- return new ArrayList<TrackedReference<SortedOplogReader>>(readers);
- }
-
- @Override
- public void add(SortedOplog soplog) throws IOException {
- readers.addFirst(new TrackedReference<SortedOplogReader>(soplog.createReader()));
- }
-
- @Override
- public synchronized void clear() throws IOException {
- for (TrackedReference<SortedOplogReader> tr : readers) {
- tr.get().close();
- readers.remove(tr);
- }
- }
-
- @Override
- public synchronized void close() throws IOException {
- clear();
- }
-
- @Override
- public CompactionTracker<Integer> getTracker() {
- return null;
- }
-
- @Override
- public Fileset<Integer> getFileset() {
- return fileset;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java
deleted file mode 100644
index b18919d..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-
-/**
- * Reverses the ordering imposed by the underlying comparator. Use this to
- * change from an ascending to a descending order or vice versa.
- * <p>
- * Prior to use, an instance must be configured with a comparator for delegation
- * of the comparison operations.
- *
- * @author bakera
- */
-public class ReversingSerializedComparator implements DelegatingSerializedComparator {
- private volatile SerializedComparator delegate;
-
- @Override
- public void setComparators(SerializedComparator[] sc) {
- assert sc.length == 0;
- delegate = sc[0];
- }
-
- @Override
- public SerializedComparator[] getComparators() {
- return new SerializedComparator[] { delegate };
- }
-
- @Override
- public int compare(byte[] o1, byte[] o2) {
- return compare(o1, 0, o1.length, o2, 0, o2.length);
- }
-
- @Override
- public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) {
- return delegate.compare(b2, o2, l2, b1, o1, l1);
- }
-
- /**
- * Returns a comparator that reverses the ordering imposed by the supplied
- * comparator.
- *
- * @param sc the original comparator
- * @return the reversed comparator
- */
- public static SerializedComparator reverse(SerializedComparator sc) {
- ReversingSerializedComparator rev = new ReversingSerializedComparator();
- rev.delegate = sc;
-
- return rev;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java
deleted file mode 100644
index 5976ad0..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-
-/**
- * Implements a size-tiered compaction scheme in which the soplogs are organized
- * by levels of increasing size. Each level is limited to a fixed number of
- * files, <code>M</code>. Given an initial size of <code>N</code> the amount of
- * disk space consumed by a level <code>L</code> is <code>M * N^(L+1)</code>.
- * <p>
- * During compaction, this approach will temporarily double the amount of space
- * consumed by the level. Compactions are performed on a background thread.
- * <p>
- * Soplogs that have been compacted will be moved to the inactive list where they
- * will be deleted once they are no longer in use.
- *
- * @author bakera
- */
-public class SizeTieredCompactor extends AbstractCompactor<Integer> {
- /** restricts the number of soplogs per level */
- private final int maxFilesPerLevel;
-
- // TODO consider relaxing the upper bound so the levels are created dynamically
- /** restricts the number of levels; files in maxLevel are not compacted */
- private final int maxLevels;
-
- public SizeTieredCompactor(SortedOplogFactory factory,
- Fileset<Integer> fileset, CompactionTracker<Integer> tracker,
- Executor exec, int maxFilesPerLevel, int maxLevels)
- throws IOException {
- super(factory, fileset, tracker, exec);
-
- assert maxFilesPerLevel > 0;
- assert maxLevels > 0;
-
- this.maxFilesPerLevel = maxFilesPerLevel;
- this.maxLevels = maxLevels;
-
- final boolean isDebugEnabled = logger.isDebugEnabled();
- if (isDebugEnabled) {
- logger.debug("{}Creating size-tiered compactor", super.logPrefix);
- }
-
- for (int i = 0; i < maxLevels; i++) {
- levels.add(new OrderedLevel(i));
- }
-
- for (Map.Entry<Integer, ? extends Iterable<File>> entry : fileset.recover().entrySet()) {
- int level = Math.min(maxLevels - 1, entry.getKey());
- for (File f : entry.getValue()) {
- if (isDebugEnabled) {
- logger.debug("{}Adding {} to level {}", super.logPrefix, f, level);
- }
- levels.get(level).add(factory.createSortedOplog(f));
- }
- }
- }
-
- @Override
- public String toString() {
- return String.format("%s <%d/%d>", factory.getConfiguration().getName(), maxFilesPerLevel, maxLevels);
- }
-
- /**
- * Organizes a set of soplogs for a given level. All operations on the
- * soplogs are synchronized via the instance monitor.
- */
- protected class OrderedLevel extends Level {
- /** the ordered set of soplog readers */
- private final Deque<TrackedReference<SortedOplogReader>> soplogs;
-
- /** true if the level is being compacted */
- private final AtomicBoolean isCompacting;
-
- public OrderedLevel(int level) {
- super(level);
- soplogs = new ArrayDeque<TrackedReference<SortedOplogReader>>(maxFilesPerLevel);
- isCompacting = new AtomicBoolean(false);
- }
-
- @Override
- protected synchronized boolean needsCompaction() {
- // TODO this is safe but overly conservative...we need to allow parallel
- // compaction of a level such that we guarantee completion order and handle
- // errors
- return !isCompacting.get()
- && soplogs.size() >= maxFilesPerLevel
- && level != maxLevels - 1;
- }
-
- @Override
- protected List<TrackedReference<SortedOplogReader>> getSnapshot(byte[] start, byte[] end) {
- // ignoring range limits since keys are stored in overlapping files
- List<TrackedReference<SortedOplogReader>> snap;
- synchronized (this) {
- snap = new ArrayList<TrackedReference<SortedOplogReader>>(soplogs);
- }
-
- for (TrackedReference<SortedOplogReader> tr : snap) {
- tr.increment();
- }
- return snap;
- }
-
- @Override
- protected synchronized void clear() throws IOException {
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- tr.get().close();
- }
- markAsInactive(soplogs, level);
- soplogs.clear();
- }
-
- @Override
- protected synchronized void close() throws IOException {
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- tr.get().close();
- factory.getConfiguration().getStatistics().incActiveFiles(-1);
- }
- soplogs.clear();
- }
-
- @Override
- protected void add(SortedOplog soplog) throws IOException {
- SortedOplogReader rdr = soplog.createReader();
- synchronized (this) {
- soplogs.addFirst(new TrackedReference<SortedOplogReader>(rdr));
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}Added file {} to level {}", SizeTieredCompactor.super.logPrefix, rdr, level);
- }
- tracker.fileAdded(rdr.getFile(), level);
- factory.getConfiguration().getStatistics().incActiveFiles(1);
- }
-
- @Override
- protected boolean compact(AtomicBoolean aborted) throws IOException {
- assert level < maxLevels : "Can't compact level: " + level;
-
- if (!isCompacting.compareAndSet(false, true)) {
- // another thread won so gracefully bow out
- return false;
- }
-
- try {
- List<TrackedReference<SortedOplogReader>> snapshot = getSnapshot(null, null);
- try {
- SortedOplog merged = merge(snapshot, level == maxLevels - 1, aborted);
-
- synchronized (this) {
- if (merged != null) {
- levels.get(Math.min(level + 1, maxLevels - 1)).add(merged);
- }
- markAsInactive(snapshot, level);
- soplogs.removeAll(snapshot);
- }
- } catch (InterruptedIOException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Aborting compaction of level {}", SizeTieredCompactor.super.logPrefix, level);
- }
- return false;
- }
- return true;
- } finally {
- boolean set = isCompacting.compareAndSet(true, false);
- assert set;
- }
- }
- }
-}