You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ab...@apache.org on 2015/11/12 23:45:49 UTC
[1/3] incubator-geode git commit: GEODE-544: Removes soplog code and
tests
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-544 [created] f95eb6832
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplog.java
deleted file mode 100644
index 8995f66..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplog.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.nofile;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.AbstractSortedReader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedBuffer.BufferIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedStatistics;
-
-public class NoFileSortedOplog implements SortedOplog {
- private final SortedOplogConfiguration config;
-
- private final AtomicReference<NavigableMap<byte[], byte[]>> data;
- private final EnumMap<Metadata, byte[]> metadata;
- private final NoFileStatistics stats;
-
- public NoFileSortedOplog(SortedOplogConfiguration config) {
- this.config = config;
-
- data = new AtomicReference<NavigableMap<byte[],byte[]>>();
- metadata = new EnumMap<Metadata, byte[]>(Metadata.class);
- stats = new NoFileStatistics();
- }
-
- @Override
- public SortedOplogReader createReader() throws IOException {
- return new NoFileReader();
- }
-
- @Override
- public SortedOplogWriter createWriter() throws IOException {
- synchronized (metadata) {
- metadata.clear();
- }
- data.set(new ConcurrentSkipListMap<byte[], byte[]>(config.getComparator()));
-
- return new NoFileWriter();
- }
-
- private class NoFileWriter implements SortedOplogWriter {
- @Override
- public void append(byte[] key, byte[] value) throws IOException {
- if (data.get().put(key, value) == null) {
- stats.add(key.length, value.length);
- }
- }
-
- @Override
- public void append(ByteBuffer key, ByteBuffer value) throws IOException {
- byte[] k = new byte[key.remaining()];
- byte[] v = new byte[value.remaining()];
-
- key.get(k);
- value.get(v);
-
- append(k, v);
- }
-
- @Override
- public void close(EnumMap<Metadata, byte[]> meta) throws IOException {
- if (meta != null) {
- synchronized (metadata) {
- metadata.putAll(meta);
- }
- }
- }
-
- @Override
- public void closeAndDelete() throws IOException {
- data.get().clear();
- data.set(null);
- }
- }
-
- private class NoFileReader extends AbstractSortedReader implements SortedOplogReader {
- private final BloomFilter bloom;
- private volatile boolean closed;
-
- public NoFileReader() {
- closed = false;
- bloom = new BloomFilter() {
- @Override
- public boolean mightContain(byte[] key) {
- return data.get().containsKey(key);
- }
- };
- }
-
- @Override
- public boolean mightContain(byte[] key) throws IOException {
- return bloom.mightContain(key);
- }
-
- @Override
- public ByteBuffer read(byte[] key) throws IOException {
- return ByteBuffer.wrap(data.get().get(key));
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive,
- boolean ascending,
- MetadataFilter filter) {
-
- if (filter == null || filter.accept(metadata.get(filter.getName()))) {
- NavigableMap<byte[],byte[]> subset = ascending ? data.get() : data.get().descendingMap();
- if (from == null && to == null) {
- // we're good
- } else if (from == null) {
- subset = subset.headMap(to, toInclusive);
- } else if (to == null) {
- subset = subset.tailMap(from, fromInclusive);
- } else {
- subset = subset.subMap(from, fromInclusive, to, toInclusive);
- }
- return new BufferIterator(subset.entrySet().iterator());
- }
- return new BufferIterator(Collections.<byte[], byte[]>emptyMap().entrySet().iterator());
- }
-
- @Override
- public SerializedComparator getComparator() {
- return (SerializedComparator) data.get().comparator();
- }
-
- @Override
- public SortedStatistics getStatistics() {
- return stats;
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
- }
-
- @Override
- public BloomFilter getBloomFilter() {
- return bloom;
- }
-
- @Override
- public byte[] getMetadata(Metadata name) {
- synchronized (metadata) {
- return metadata.get(name);
- }
- }
-
- @Override
- public File getFile() {
- return new File(".");
- }
-
- @Override
- public String getFileName() {
- return "name";
- }
-
- @Override
- public long getModificationTimeStamp() throws IOException {
- return 0;
- }
-
- @Override
- public void rename(String name) throws IOException {
- }
-
- @Override
- public void delete() throws IOException {
- }
- }
-
- private class NoFileStatistics implements SortedStatistics {
- private long keys;
- private double avgKeySize;
- private double avgValueSize;
-
- private synchronized void add(int keyLength, int valueLength) {
- avgKeySize = (keyLength + keys * avgKeySize) / (keys + 1);
- avgValueSize = (keyLength + keys * avgValueSize) / (keys + 1);
-
- keys++;
- }
-
- @Override
- public synchronized long keyCount() {
- return keys;
- }
-
- @Override
- public byte[] firstKey() {
- return data.get().firstKey();
- }
-
- @Override
- public byte[] lastKey() {
- return data.get().lastKey();
- }
-
- @Override
- public synchronized double avgKeySize() {
- return avgKeySize;
- }
-
- @Override
- public synchronized double avgValueSize() {
- return avgValueSize;
- }
-
- @Override
- public void close() {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplogFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplogFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplogFactory.java
deleted file mode 100644
index 365d796..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplogFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.nofile;
-
-import java.io.File;
-import java.io.IOException;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory;
-
-public class NoFileSortedOplogFactory implements SortedOplogFactory {
- private final SortedOplogConfiguration config;
-
- public NoFileSortedOplogFactory(String name) {
- config = new SortedOplogConfiguration(name);
- }
-
- @Override
- public SortedOplog createSortedOplog(File name) throws IOException {
- return new NoFileSortedOplog(config);
- }
-
- @Override
- public SortedOplogConfiguration getConfiguration() {
- return config;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AppendLog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AppendLog.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AppendLog.java
deleted file mode 100644
index e0e696a..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AppendLog.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-public class AppendLog {
-
- public static AppendLogReader recover(File f) throws IOException {
- throw new RuntimeException("Not implemented");
- }
-
- public static AppendLogWriter create(File f) throws IOException {
- DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(f)));
- return new AppendLogWriter(f, dos);
- }
-
- public static class AppendLogReader {
- }
-
- public static class AppendLogWriter implements Closeable {
- private final File file;
- private final DataOutputStream out;
-
- private AppendLogWriter(File f, DataOutputStream out) {
- this.file = f;
- this.out = out;
- }
-
- public synchronized void append(byte[] key, byte[] value) throws IOException {
- out.writeInt(key.length);
- out.writeInt(value.length);
- out.write(key);
- out.write(value);
- }
-
- @Override
- public void close() throws IOException {
- out.close();
- }
-
- public File getFile() {
- return file;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparatorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparatorJUnitTest.java
deleted file mode 100644
index 3689255..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparatorJUnitTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class ArraySerializedComparatorJUnitTest extends ComparisonTestCase {
- protected ArraySerializedComparator comp;
-
- public void testSearch() throws IOException {
- byte[] k1 = comp.createCompositeKey(convert("aaaa"), convert(1), convert(true));
- byte[] k2 = comp.createCompositeKey(convert("bbbb"), convert(2), convert(false));
- byte[] k3 = comp.createCompositeKey(convert("bbbb"), convert(3), convert(true));
- byte[] k4 = comp.createCompositeKey(convert("cccc"), convert(1), convert(false));
-
- byte[] s1 = comp.createCompositeKey(convert("aaaa"),
- new byte[] {SoplogToken.WILDCARD.toByte()}, new byte[] {SoplogToken.WILDCARD.toByte()});
-
-
- byte[] s2 = comp.createCompositeKey(convert("bbbb"),
- new byte[] {SoplogToken.WILDCARD.toByte()}, new byte[] {SoplogToken.WILDCARD.toByte()});
-
- byte[] s3 = comp.createCompositeKey(new byte[] {SoplogToken.WILDCARD.toByte()}, convert(1),
- new byte[] {SoplogToken.WILDCARD.toByte()});
-
- compareAsIs(comp, k1, s1, Comparison.EQ);
- compareAsIs(comp, k2, s1, Comparison.GT);
- compareAsIs(comp, k1, s2, Comparison.LT);
- compareAsIs(comp, k2, s2, Comparison.EQ);
- compareAsIs(comp, k3, s2, Comparison.EQ);
- compareAsIs(comp, k4, s2, Comparison.GT);
- compareAsIs(comp, s3, k4, Comparison.EQ);
- }
-
- public void testCompositeKey() throws IOException {
- byte[] k1 = comp.createCompositeKey(convert("aaaa"), convert(1), convert(true));
- byte[] k2 = comp.createCompositeKey(convert("bbbb"), convert(2), convert(false));
- byte[] k3 = comp.createCompositeKey(convert("bbbb"), convert(3), convert(true));
- byte[] k4 = comp.createCompositeKey(convert("cccc"), convert(1), convert(false));
- byte[] k5 = comp.createCompositeKey(convert(null), convert(1), convert(false));
- byte[] k6 = comp.createCompositeKey(convert(null), convert(1), convert(true));
-
- compareAsIs(comp, k1, k1, Comparison.EQ);
- compareAsIs(comp, k1, k2, Comparison.LT);
- compareAsIs(comp, k2, k1, Comparison.GT);
- compareAsIs(comp, k2, k3, Comparison.LT);
- compareAsIs(comp, k3, k4, Comparison.LT);
-
- compareAsIs(comp, k4, k5, Comparison.LT);
- compareAsIs(comp, k5, k1, Comparison.GT);
- compareAsIs(comp, k5, k6, Comparison.LT);
- }
-
- public void testGetKey() throws Exception {
- Object[] vals = new Object[] { "aaaa", 1, true };
-
- byte[][] composite = new byte[][] { convert(vals[0]), convert(vals[1]), convert(vals[2]) };
- ByteBuffer key = ByteBuffer.wrap(comp.createCompositeKey(composite));
-
- for (int i = 0; i < 3; i++) {
- ByteBuffer subkey = comp.getKey(key, i);
- assertEquals(vals[i], recover(subkey.array(), subkey.arrayOffset(), subkey.remaining()));
- }
- }
-
- public void setUp() {
- comp = new ArraySerializedComparator();
- comp.setComparators(new SerializedComparator[] {
- new LexicographicalComparator(), // string
- new LexicographicalComparator(), // int
- new LexicographicalComparator() // boolean
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompactionSortedOplogSetTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompactionSortedOplogSetTestCase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompactionSortedOplogSetTestCase.java
deleted file mode 100644
index bca52a9..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompactionSortedOplogSetTestCase.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.CompactionTestCase.FileTracker;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.CompactionTestCase.WaitingHandler;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.nofile.NoFileSortedOplogFactory;
-
-public abstract class CompactionSortedOplogSetTestCase extends SortedOplogSetJUnitTest {
- public void testWithCompaction() throws IOException, InterruptedException {
- FlushCounter handler = new FlushCounter();
- SortedOplogSet sos = createSoplogSet("compact");
-
- for (int i = 0; i < 1000; i++) {
- sos.put(wrapInt(i), wrapInt(i));
- if (i % 100 == 0) {
- sos.flush(null, handler);
- }
- }
-
- flushAndWait(handler, sos);
-
- compactAndWait(sos, false);
- validate(sos, 1000);
- sos.close();
- }
-
- public void testTombstone() throws Exception {
- FlushCounter handler = new FlushCounter();
- SortedOplogFactory factory = new NoFileSortedOplogFactory("tombstone");
- Compactor compactor = new SizeTieredCompactor(factory,
- NonCompactor.createFileset("tombstone", new File(".")),
- new FileTracker(),
- Executors.newSingleThreadExecutor(),
- 2, 2);
-
- SortedOplogSet sos = new SortedOplogSetImpl(factory, Executors.newSingleThreadExecutor(), compactor);
-
- for (int i = 0; i < 1000; i++) {
- sos.put(wrapInt(i), wrapInt(i));
- }
- sos.flush(null, handler);
-
- for (int i = 900; i < 1000; i++) {
- sos.put(wrapInt(i), new byte[] {SoplogToken.TOMBSTONE.toByte()});
- }
- flushAndWait(handler, sos);
- compactAndWait(sos, true);
-
- validate(sos, 900);
- sos.close();
-
- }
-
- public void testInUse() throws Exception {
- FlushCounter handler = new FlushCounter();
- SortedOplogSet sos = createSoplogSet("inuse");
-
- for (int i = 0; i < 1000; i++) {
- sos.put(wrapInt(i), wrapInt(i));
- }
-
- flushAndWait(handler, sos);
-
- // start iterating over soplog
- SortedIterator<ByteBuffer> range = sos.scan();
- assertEquals(0, ((SizeTieredCompactor) sos.getCompactor()).countInactiveReaders());
-
- for (int i = 1000; i < 5000; i++) {
- sos.put(wrapInt(i), wrapInt(i));
- if (i % 100 == 0) {
- sos.flush(null, handler);
- }
- }
-
- flushAndWait(handler, sos);
- compactAndWait(sos, false);
- assertEquals(1, ((SizeTieredCompactor) sos.getCompactor()).countInactiveReaders());
-
- range.close();
- compactAndWait(sos, false);
- assertEquals(0, ((SizeTieredCompactor) sos.getCompactor()).countInactiveReaders());
-
- validate(sos, 5000);
- sos.close();
- }
-
- @Override
- protected SortedOplogSetImpl createSoplogSet(String name) throws IOException {
- SortedOplogFactory factory = new NoFileSortedOplogFactory(name);
- Compactor compactor = createCompactor(factory);
-
- return new SortedOplogSetImpl(factory, Executors.newSingleThreadExecutor(), compactor);
- }
-
- protected abstract AbstractCompactor<?> createCompactor(SortedOplogFactory factory) throws IOException;
-
- private void validate(SortedReader<ByteBuffer> range, int count) throws IOException {
- int i = 0;
- for (SortedIterator<ByteBuffer> iter = range.scan(); iter.hasNext(); i++) {
- iter.next();
- assertEquals(i, iter.key().getInt());
- }
- assertEquals(count, i);
- range.close();
- }
-
- private void compactAndWait(SortedOplogSet sos, boolean force) throws InterruptedException {
- WaitingHandler wh = new WaitingHandler();
- sos.getCompactor().compact(force, wh);
- wh.waitForCompletion();
- assertNull(wh.getError());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompactionTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompactionTestCase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompactionTestCase.java
deleted file mode 100644
index 7577b53..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompactionTestCase.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.logging.log4j.Logger;
-
-import junit.framework.TestCase;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.CompactionHandler;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.CompactionTracker;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogWriter;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.nofile.NoFileSortedOplogFactory;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-public abstract class CompactionTestCase<T extends Comparable<T>> extends TestCase {
- private static final Logger logger = LogService.getLogger();
- protected SortedOplogFactory factory;
- protected AbstractCompactor<T> compactor;
-
- public void testMaxCompaction() throws Exception {
- for (int i = 0; i < 100; i += 2) {
- compactor.add(createSoplog(0, 100, i));
- compactor.add(createSoplog(100, 100, i+1));
-
- WaitingHandler wh = new WaitingHandler();
- compactor.compact(false, wh);
- wh.waitForCompletion();
- }
-
- WaitingHandler wh = new WaitingHandler();
- compactor.compact(true, wh);
- wh.waitForCompletion();
- }
-
- public void testClear() throws IOException {
- compactor.add(createSoplog(0, 100, 0));
- compactor.add(createSoplog(100, 100, 1));
- compactor.clear();
-
- assertEquals(0, compactor.getActiveReaders(null, null).size());
- assertFalse(compactor.getLevel(0).needsCompaction());
- }
-
- public void testInterruptedCompaction() throws IOException {
- compactor.add(createSoplog(0, 100, 0));
- compactor.add(createSoplog(100, 100, 1));
-
- compactor.testAbortDuringCompaction = true;
- boolean compacted = compactor.compact();
-
- assertFalse(compacted);
- assertEquals(2, compactor.getActiveReaders(null, null).size());
- assertTrue(compactor.getLevel(0).needsCompaction());
- }
-
- public void testClearDuringCompaction() throws Exception {
- compactor.add(createSoplog(0, 100, 0));
- compactor.add(createSoplog(100, 100, 1));
-
- compactor.testDelayDuringCompaction = new CountDownLatch(1);
- WaitingHandler wh = new WaitingHandler();
-
- logger.debug("Invoking compact");
- compactor.compact(false, wh);
-
- logger.debug("Invoking clear");
- compactor.clear();
-
- boolean compacted = wh.waitForCompletion();
-
- assertFalse(compacted);
- assertEquals(0, compactor.getActiveReaders(null, null).size());
- }
-
- public void testClearRepeat() throws Exception {
- int i = 0;
- do {
- testClearDuringCompaction();
- logger.debug("Test {} is complete", i);
- tearDown();
- setUp();
- } while (i++ < 100);
- }
-
- public void testCloseRepeat() throws Exception {
- int i = 0;
- do {
- testCloseDuringCompaction();
- logger.debug("Test {} is complete", i);
- tearDown();
- setUp();
- } while (i++ < 100);
- }
-
- public void testCloseDuringCompaction() throws Exception {
- compactor.add(createSoplog(0, 100, 0));
- compactor.add(createSoplog(100, 100, 1));
-
- compactor.testDelayDuringCompaction = new CountDownLatch(1);
- WaitingHandler wh = new WaitingHandler();
-
- compactor.compact(false, wh);
- compactor.close();
-
- boolean compacted = wh.waitForCompletion();
-
- assertFalse(compacted);
- assertEquals(0, compactor.getActiveReaders(null, null).size());
- }
-
- public void setUp() throws IOException {
- factory = new NoFileSortedOplogFactory("test");
- compactor = createCompactor(factory);
- }
-
- public void tearDown() throws Exception {
- compactor.close();
- for (File f : SortedReaderTestCase.getSoplogsToDelete()) {
- f.delete();
- }
- }
-
- protected SortedOplog createSoplog(int start, int count, int id) throws IOException {
- SortedOplog soplog = factory.createSortedOplog(new File("test-" + id + ".soplog"));
- SortedOplogWriter wtr = soplog.createWriter();
- try {
- for (int i = start; i < start + count; i++) {
- wtr.append(SortedReaderTestCase.wrapInt(i), SortedReaderTestCase.wrapInt(i));
- }
- } finally {
- wtr.close(null);
- }
- return soplog;
- }
-
- protected abstract AbstractCompactor<T> createCompactor(SortedOplogFactory factory) throws IOException;
-
- static class WaitingHandler implements CompactionHandler {
- private final CountDownLatch latch;
- private final AtomicReference<Throwable> err;
- private volatile boolean compacted;
-
- public WaitingHandler() {
- latch = new CountDownLatch(1);
- err = new AtomicReference<Throwable>();
- }
-
- public boolean waitForCompletion() throws InterruptedException {
- logger.debug("Waiting for compaction to complete");
- latch.await();
- logger.debug("Done waiting!");
-
- return compacted;
- }
-
- public Throwable getError() {
- return err.get();
- }
-
- @Override
- public void complete(boolean compacted) {
- logger.debug("Compaction completed with {}", compacted);
- this.compacted = compacted;
- latch.countDown();
- }
-
- @Override
- public void failed(Throwable ex) {
- err.set(ex);
- latch.countDown();
- }
- }
-
- static class FileTracker implements CompactionTracker<Integer> {
- @Override
- public void fileDeleted(File f) {
- }
-
- @Override
- public void fileAdded(File f, Integer attach) {
- }
-
- @Override
- public void fileRemoved(File f, Integer attach) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ComparisonTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ComparisonTestCase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ComparisonTestCase.java
deleted file mode 100644
index cb9e909..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ComparisonTestCase.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-
-public abstract class ComparisonTestCase extends TestCase {
- public enum Comparison {
- EQ(0.0),
- LT(-1.0),
- GT(1.0);
-
- private final double sgn;
-
- private Comparison(double sgn) {
- this.sgn = sgn;
- }
-
- public double signum() {
- return sgn;
- }
-
- public Comparison opposite() {
- switch (this) {
- case LT: return GT;
- case GT : return LT;
- default : return EQ;
- }
- }
- }
-
- public void compare(SerializedComparator sc, Object o1, Object o2, Comparison result) throws IOException {
- double diff = sc.compare(convert(o1), convert(o2));
- assertEquals(String.format("%s ? %s", o1, o2), result.signum(), Math.signum(diff));
- }
-
- public void compareAsIs(SerializedComparator sc, byte[] b1, byte[] b2, Comparison result) throws IOException {
- double diff = sc.compare(b1, b2);
- assertEquals(result.signum(), Math.signum(diff));
- }
-
- public static byte[] convert(Object o) throws IOException {
- ByteArrayOutputStream b = new ByteArrayOutputStream();
- DataOutputStream d = new DataOutputStream(b);
- DataSerializer.writeObject(o, d);
-
- return b.toByteArray();
- }
-
- public static Object recover(byte[] obj, int off, int len) throws ClassNotFoundException, IOException {
- DataInputStream in = new DataInputStream(new ByteArrayInputStream(obj, off, len));
- return DataSerializer.readObject(in);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexComparatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexComparatorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexComparatorJUnitTest.java
deleted file mode 100644
index 27202ec..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexComparatorJUnitTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class IndexComparatorJUnitTest extends ComparisonTestCase {
- protected IndexSerializedComparator comp;
-
- public void testSearch() throws IOException {
- byte[] k1 = comp.createCompositeKey(convert("aaaa"), convert(1));
- byte[] k2 = comp.createCompositeKey(convert("bbbb"), convert(2));
- byte[] k3 = comp.createCompositeKey(convert("bbbb"), convert(3));
- byte[] k4 = comp.createCompositeKey(convert(null), convert(1));
-
- byte[] s1 = comp.createCompositeKey(convert("aaaa"), new byte[] {SoplogToken.WILDCARD.toByte()});
- byte[] s2 = comp.createCompositeKey(convert("bbbb"), new byte[] {SoplogToken.WILDCARD.toByte()});
- byte[] s3 = comp.createCompositeKey(new byte[] {SoplogToken.WILDCARD.toByte()}, convert(1));
-
- compareAsIs(comp, k1, s1, Comparison.EQ);
- compareAsIs(comp, k2, s1, Comparison.GT);
- compareAsIs(comp, k1, s2, Comparison.LT);
- compareAsIs(comp, k2, s2, Comparison.EQ);
- compareAsIs(comp, k3, s2, Comparison.EQ);
- compareAsIs(comp, k4, s2, Comparison.GT);
- compareAsIs(comp, s3, k4, Comparison.EQ);
- }
-
- public void testCompositeKey() throws IOException {
- byte[] k1 = comp.createCompositeKey(convert("aaaa"), convert(1));
- byte[] k2 = comp.createCompositeKey(convert("bbbb"), convert(2));
- byte[] k3 = comp.createCompositeKey(convert("bbbb"), convert(3));
- byte[] k4 = comp.createCompositeKey(convert("cccc"), convert(1));
- byte[] k5 = comp.createCompositeKey(convert(null), convert(1));
-
- compareAsIs(comp, k1, k1, Comparison.EQ);
- compareAsIs(comp, k1, k2, Comparison.LT);
- compareAsIs(comp, k2, k1, Comparison.GT);
- compareAsIs(comp, k2, k3, Comparison.LT);
- compareAsIs(comp, k3, k4, Comparison.LT);
-
- compareAsIs(comp, k4, k5, Comparison.LT);
- compareAsIs(comp, k5, k1, Comparison.GT);
- }
-
- public void testGetKey() throws Exception {
- ByteBuffer key = ByteBuffer.wrap(comp.createCompositeKey(convert("aaaa"), convert(1)));
-
- ByteBuffer k1 = comp.getKey(key, 0);
- assertEquals("aaaa", recover(k1.array(), k1.arrayOffset(), k1.remaining()));
-
- ByteBuffer k2 = comp.getKey(key, 1);
- assertEquals(1, recover(k2.array(), k2.arrayOffset(), k2.remaining()));
- }
-
- public void setUp() {
- comp = new IndexSerializedComparator();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparatorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparatorJUnitTest.java
deleted file mode 100644
index 0c0e93e..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparatorJUnitTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.DSCODE;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class LexicographicalComparatorJUnitTest extends ComparisonTestCase {
- private LexicographicalComparator lc;
-
- public void testMixedNumeric() throws Exception {
- compare(lc, (byte) 1, (short) 2, Comparison.LT);
- compare(lc, (byte) 1, (int) 2, Comparison.LT);
- compare(lc, (byte) 1, (long) 2, Comparison.LT);
- compare(lc, (byte) 1, 2.1f, Comparison.LT);
- compare(lc, (byte) 1, 2.1d, Comparison.LT);
-
- compare(lc, (short) 1, (byte) 2, Comparison.LT);
- compare(lc, (short) 1, (int) 2, Comparison.LT);
- compare(lc, (short) 1, (long) 2, Comparison.LT);
- compare(lc, (short) 1, 2.1f, Comparison.LT);
- compare(lc, (short) 1, 2.1d, Comparison.LT);
-
- compare(lc, (int) 1, (byte) 2, Comparison.LT);
- compare(lc, (int) 1, (short) 2, Comparison.LT);
- compare(lc, (int) 1, (long) 2, Comparison.LT);
- compare(lc, (int) 1, 2.1f, Comparison.LT);
- compare(lc, (int) 1, 2.1d, Comparison.LT);
-
- compare(lc, (long) 1, (byte) 2, Comparison.LT);
- compare(lc, (long) 1, (short) 2, Comparison.LT);
- compare(lc, (long) 1, (int) 2, Comparison.LT);
- compare(lc, (long) 1, 2.1f, Comparison.LT);
- compare(lc, (long) 1, 2.1d, Comparison.LT);
-
- compare(lc, 1.1f, (byte) 2, Comparison.LT);
- compare(lc, 1.1f, (short) 2, Comparison.LT);
- compare(lc, 1.1f, (int) 2, Comparison.LT);
- compare(lc, 1.1f, (long) 2, Comparison.LT);
- compare(lc, 1.1f, 2.1d, Comparison.LT);
-
- compare(lc, 1.1d, (byte) 2, Comparison.LT);
- compare(lc, 1.1d, (short) 2, Comparison.LT);
- compare(lc, 1.1d, (int) 2, Comparison.LT);
- compare(lc, 1.1d, (long) 2, Comparison.LT);
- compare(lc, 1.1d, 2.1f, Comparison.LT);
- }
-
- public void testBoolean() throws Exception {
- compare(lc, Boolean.TRUE, Boolean.TRUE, Comparison.EQ);
- compare(lc, Boolean.FALSE, Boolean.FALSE, Comparison.EQ);
- compare(lc, Boolean.TRUE, Boolean.FALSE, Comparison.GT);
- compare(lc, Boolean.FALSE, Boolean.TRUE, Comparison.LT);
- }
-
- public void testByte() throws Exception {
- compare(lc, (byte) 0, (byte) 0, Comparison.EQ);
- compare(lc, (byte) 0, (byte) 1, Comparison.LT);
- compare(lc, (byte) -1, (byte) 1, Comparison.LT);
- compare(lc, (byte) 1, (byte) -1, Comparison.GT);
- compare(lc, (byte) -2, (byte) -1, Comparison.LT);
- compare(lc, (byte) 1, (byte) 2, Comparison.LT);
- compare(lc, (byte) 2, (byte) 1, Comparison.GT);
- }
-
- public void testShort() throws Exception {
- compare(lc, (short) 0, (short) 0, Comparison.EQ);
- compare(lc, (short) 0, (short) 1, Comparison.LT);
- compare(lc, (short) -1, (short) 1, Comparison.LT);
- compare(lc, (short) 1, (short) -1, Comparison.GT);
- compare(lc, (short) -2, (short) -1, Comparison.LT);
- compare(lc, (short) 1, (short) 2, Comparison.LT);
- compare(lc, (short) 2, (short) 1, Comparison.GT);
- }
-
- public void testInt() throws Exception {
- compare(lc, (int) 0, (int) 0, Comparison.EQ);
- compare(lc, (int) 0, (int) 1, Comparison.LT);
- compare(lc, (int) -1, (int) 1, Comparison.LT);
- compare(lc, (int) 1, (int) -1, Comparison.GT);
- compare(lc, (int) -2, (int) -1, Comparison.LT);
- compare(lc, (int) 1, (int) 2, Comparison.LT);
- compare(lc, (int) 2, (int) 1, Comparison.GT);
- }
-
- public void testLong() throws Exception {
- compare(lc, (long) 0, (long) 0, Comparison.EQ);
- compare(lc, (long) 0, (long) 1, Comparison.LT);
- compare(lc, (long) -1, (long) 1, Comparison.LT);
- compare(lc, (long) 1, (long) -1, Comparison.GT);
- compare(lc, (long) -2, (long) -1, Comparison.LT);
- compare(lc, (long) 1, (long) 2, Comparison.LT);
- compare(lc, (long) 2, (long) 1, Comparison.GT);
- }
-
- public void testFloat() throws Exception {
- compare(lc, 0.0f, 0.0f, Comparison.EQ);
- compare(lc, 0.0f, 1.0f, Comparison.LT);
- compare(lc, -1.0f, 1.0f, Comparison.LT);
- compare(lc, 1.0f, -1.0f, Comparison.GT);
- compare(lc, -2.0f, -1.0f, Comparison.LT);
- compare(lc, 1.0f, 2.0f, Comparison.LT);
- compare(lc, 2.0f, 1.0f, Comparison.GT);
- compare(lc, 2.1f, 0.9f, Comparison.GT);
- }
-
- public void testDouble() throws Exception {
- compare(lc, 0.0d, 0.0d, Comparison.EQ);
- compare(lc, 0.0d, 1.0d, Comparison.LT);
- compare(lc, -1.0d, 1.0d, Comparison.LT);
- compare(lc, 1.0d, -1.0d, Comparison.GT);
- compare(lc, -2.0d, -1.0d, Comparison.LT);
- compare(lc, 1.0d, 2.0d, Comparison.LT);
- compare(lc, 2.0d, 1.0d, Comparison.GT);
- compare(lc, 2.1d, 0.9d, Comparison.GT);
- }
-
- public void testString() throws Exception {
- compare(lc, "", "", Comparison.EQ);
- compare(lc, "aa", "a", Comparison.GT);
- compare(lc, "a", "b", Comparison.LT);
- compare(lc, "b", "a", Comparison.GT);
- compare(lc, "baah", "aardvark", Comparison.GT);
- compare(lc, "Chloé", "Réal", Comparison.LT);
- compare(lc, "chowder", "Réal", Comparison.GT);
- compare(lc, "Réal", "chowder", Comparison.LT);
- compare(lc, "Réal", "Réa", Comparison.GT);
- compare(lc, "Réal", "Réa", Comparison.GT);
- compare(lc, "\u0061\u00a2\u0f00", "\u0061\u00a2\u0f00\u0061", Comparison.LT);
- }
-
- public void testChar() throws Exception {
- compare(lc, 'a', 'a', Comparison.EQ);
- compare(lc, 'a', 'b', Comparison.LT);
- compare(lc, 'b', 'a', Comparison.GT);
- }
-
- public void testNull() throws Exception {
- compare(lc, null, null, Comparison.EQ);
- compare(lc, null, "hi mom", Comparison.GT);
- compare(lc, "hi mom", null, Comparison.LT);
- }
-
- public void testObject() throws Exception {
- compare(lc, new BigInteger("1"), new BigInteger("1"), Comparison.EQ);
- compare(lc, new BigInteger("1"), new BigInteger("0"), Comparison.GT);
- compare(lc, new BigInteger("-1"), new BigInteger("0"), Comparison.LT);
- }
-
- public void testIntPerformance() throws Exception {
- ByteBuffer b1 = ByteBuffer.allocate(5).put(0, DSCODE.INTEGER);
- ByteBuffer b2 = ByteBuffer.allocate(5).put(0, DSCODE.INTEGER);
-
- for (int n = 0; n < 5; n++) {
- long diff = 0;
- int count = 10000000;
- long start = System.nanoTime();
- for (int i = 0; i < count; i++) {
- b1.putInt(1, i);
- b2.putInt(1, i + 1);
- diff += lc.compare(b1.array(), b1.arrayOffset(), b1.capacity(), b2.array(), b2.arrayOffset(), b2.capacity());
- }
- long nanos = System.nanoTime() - start;
-
- System.out.printf("(%d) %f int comparisons / ms\n", diff, 1000000.0 * count / nanos);
-
- diff = 0;
- start = System.nanoTime();
- for (int i = 0; i < count; i++) {
- b1.putInt(1, i);
- b2.putInt(1, i + 1);
- diff += Bytes.compareTo(b1.array(), b1.arrayOffset(), b1.capacity(), b2.array(), b2.arrayOffset(), b2.capacity());
- }
- nanos = System.nanoTime() - start;
-
- System.out.printf("(%d) %f byte comparisons / ms\n\n", diff, 1000000.0 * count / nanos);
- }
- }
-
- protected void setUp() {
- lc = new LexicographicalComparator();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/RecoverableSortedOplogSet.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/RecoverableSortedOplogSet.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/RecoverableSortedOplogSet.java
deleted file mode 100644
index 0b3e1f5..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/RecoverableSortedOplogSet.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.nio.ByteBuffer;
-import java.util.EnumMap;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.AppendLog.AppendLogWriter;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-public class RecoverableSortedOplogSet extends AbstractSortedReader implements SortedOplogSet {
- private static final Logger logger = LogService.getLogger();
-
- private final SortedOplogSet sos;
- private final long bufferSize;
-
- private final long maxBufferMemory;
-
- private final Lock rollLock;
- private AtomicReference<AppendLogWriter> writer;
-
- private final String logPrefix;
-
- public RecoverableSortedOplogSet(SortedOplogSet sos, long bufferSize, double memLimit) throws IOException {
- this.sos = sos;
- this.bufferSize = bufferSize;
-
- this.logPrefix = "<" + sos.getFactory().getConfiguration().getName() + "> ";
-
- rollLock = new ReentrantLock();
- writer = new AtomicReference<AppendLogWriter>(AppendLog.create(nextLogFile()));
-
- maxBufferMemory = Math.round(memLimit * ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax());
- }
-
- @Override
- public boolean mightContain(byte[] key) throws IOException {
- return sos.mightContain(key);
- }
-
- @Override
- public ByteBuffer read(byte[] key) throws IOException {
- return sos.read(key);
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(byte[] from, boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
- return sos.scan(from, fromInclusive, to, toInclusive);
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from,
- boolean fromInclusive,
- byte[] to,
- boolean toInclusive,
- boolean ascending,
- MetadataFilter filter) throws IOException {
- return sos.scan(from, fromInclusive, to, toInclusive, ascending, filter);
- }
-
- @Override
- public SerializedComparator getComparator() {
- return sos.getComparator();
- }
-
- @Override
- public SortedOplogFactory getFactory() {
- return sos.getFactory();
- }
-
- @Override
- public SortedStatistics getStatistics() throws IOException {
- return sos.getStatistics();
- }
-
- @Override
- public void close() throws IOException {
- rollLock.lock();
- try {
- writer.get().close();
- writer.set(null);
- sos.close();
- } finally {
- rollLock.unlock();
- }
- }
-
- @Override
- public void put(byte[] key, byte[] value) throws IOException {
- throttle();
- if (sos.bufferSize() > bufferSize) {
- roll(false);
- }
-
- writer.get().append(key, value);
- sos.put(key, value);
- }
-
- @Override
- public long bufferSize() {
- return sos.bufferSize();
- }
-
- @Override
- public long unflushedSize() {
- return sos.unflushedSize();
- }
-
- @Override
- public void flush(EnumMap<Metadata, byte[]> metadata, FlushHandler handler) throws IOException {
- roll(true);
- }
-
- @Override
- public void flushAndClose(EnumMap<Metadata, byte[]> metadata) throws IOException {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public Compactor getCompactor() {
- return sos.getCompactor();
- }
-
- @Override
- public void clear() throws IOException {
- rollLock.lock();
- try {
- roll(true);
- sos.clear();
- } finally {
- rollLock.unlock();
- }
- }
-
- @Override
- public void destroy() throws IOException {
- roll(true);
- sos.destroy();
- }
-
- @Override
- public boolean isClosed() {
- return sos.isClosed();
- }
-
- private void throttle() {
- int n = 0;
- while (sos.bufferSize() + sos.unflushedSize() > maxBufferMemory) {
- try {
- Thread.sleep(1 << n++);
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- }
- }
- }
-
- private void roll(boolean wait) throws IOException {
- boolean locked = true;
- if (wait) {
- rollLock.lock();
- } else {
- locked = rollLock.tryLock();
- }
-
- if (locked) {
- try {
- AppendLogWriter next = AppendLog.create(nextLogFile());
- final AppendLogWriter old = writer.getAndSet(next);
- old.close();
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}Rolling from {} to {}", this.logPrefix, old.getFile(), next.getFile());
- }
-
- sos.flush(null, new FlushHandler() {
- @Override
- public void complete() {
- old.getFile().delete();
- }
-
- @Override
- public void error(Throwable t) {
- }
- });
- } finally {
- rollLock.unlock();
- }
- }
- }
-
- private File nextLogFile() {
- return new File(sos.getFactory().getConfiguration().getName()
- + "-" + UUID.randomUUID().toString() + ".aolog");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactorJUnitTest.java
deleted file mode 100644
index ee76c55..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactorJUnitTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class SizeTieredCompactorJUnitTest extends CompactionTestCase<Integer> {
- public void testBasic() throws Exception {
- compactor.add(createSoplog(0, 100, 0));
-
- assertEquals(1, compactor.getActiveReaders(null, null).size());
- assertEquals(1, compactor.getLevel(0).getSnapshot().size());
- assertFalse(compactor.getLevel(0).needsCompaction());
-
- WaitingHandler wh = new WaitingHandler();
- compactor.compact(false, wh);
- wh.waitForCompletion();
-
- assertEquals(1, compactor.getActiveReaders(null, null).size());
- assertEquals(1, compactor.getLevel(0).getSnapshot().size());
- assertFalse(compactor.getLevel(0).needsCompaction());
- }
-
- public void testCompactionLevel0() throws Exception {
- compactor.add(createSoplog(0, 100, 0));
- compactor.add(createSoplog(100, 100, 1));
-
- assertEquals(2, compactor.getActiveReaders(null, null).size());
- assertEquals(2, compactor.getLevel(0).getSnapshot().size());
- assertTrue(compactor.getLevel(0).needsCompaction());
-
- WaitingHandler wh = new WaitingHandler();
- compactor.compact(false, wh);
- wh.waitForCompletion();
-
- assertEquals(1, compactor.getActiveReaders(null, null).size());
- assertEquals(0, compactor.getLevel(0).getSnapshot().size());
- assertEquals(1, compactor.getLevel(1).getSnapshot().size());
- assertFalse(compactor.getLevel(0).needsCompaction());
- assertFalse(compactor.getLevel(1).needsCompaction());
-
- validate(compactor.getActiveReaders(null, null).iterator().next().get(), 0, 200);
- }
-
- public void testMultilevelCompaction() throws Exception {
- for (int i = 0; i < 8; i += 2) {
- compactor.add(createSoplog(0, 100, i));
- compactor.add(createSoplog(100, 100, i+1));
-
- WaitingHandler wh = new WaitingHandler();
- compactor.compact(false, wh);
- wh.waitForCompletion();
- }
-
- assertEquals(1, compactor.getActiveReaders(null, null).size());
- validate(compactor.getActiveReaders(null, null).iterator().next().get(), 0, 200);
- }
-
- public void testForceCompaction() throws Exception {
- compactor.add(createSoplog(0, 100, 0));
- compactor.add(createSoplog(100, 100, 1));
- boolean compacted = compactor.compact();
-
- assertTrue(compacted);
- validate(compactor.getActiveReaders(null, null).iterator().next().get(), 0, 200);
- }
-
- @Override
- protected AbstractCompactor<Integer> createCompactor(SortedOplogFactory factory) throws IOException {
- return new SizeTieredCompactor(factory,
- NonCompactor.createFileset("test", new File(".")),
- new FileTracker(),
- Executors.newSingleThreadExecutor(),
- 2, 4);
- }
-
- private void validate(SortedOplogReader soplog, int start, int count) throws IOException {
- int i = 0;
- for (SortedIterator<ByteBuffer> iter = soplog.scan(); iter.hasNext(); i++) {
- iter.next();
- assertEquals(i, iter.key().getInt());
- assertEquals(i, iter.value().getInt());
- }
- assertEquals(count, i);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredSortedOplogSetJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredSortedOplogSetJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredSortedOplogSetJUnitTest.java
deleted file mode 100644
index cf1de00..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredSortedOplogSetJUnitTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Executors;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.CompactionTestCase.FileTracker;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class SizeTieredSortedOplogSetJUnitTest extends CompactionSortedOplogSetTestCase {
- @Override
- protected AbstractCompactor<?> createCompactor(SortedOplogFactory factory) throws IOException {
- return new SizeTieredCompactor(factory,
- NonCompactor.createFileset("test", new File(".")),
- new FileTracker(),
- Executors.newSingleThreadExecutor(),
- 2, 4);
- }
- @Override
- public void testStatistics() throws IOException {
- // remove this noop override when bug 52249 is fixed
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBufferJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBufferJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBufferJUnitTest.java
deleted file mode 100644
index 1d79059..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBufferJUnitTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.nio.ByteBuffer;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class SortedBufferJUnitTest extends SortedReaderTestCase {
- @Override
- protected SortedReader<ByteBuffer> createReader(NavigableMap<byte[], byte[]> data) {
- SortedOplogConfiguration config = new SortedOplogConfiguration("test");
- SortedBuffer<Integer> sb = new SortedBuffer<Integer>(config, 0);
- for (Entry<byte[], byte[]> entry : data.entrySet()) {
- sb.put(entry.getKey(), entry.getValue());
- }
- return sb;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetJUnitTest.java
deleted file mode 100644
index bb0a198..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetJUnitTest.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.logging.log4j.Logger;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogSet.FlushHandler;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.nofile.NoFileSortedOplogFactory;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class SortedOplogSetJUnitTest extends SortedReaderTestCase {
- private static final Logger logger = LogService.getLogger();
- private SortedOplogSet set;
-
- public void testMergedIterator() throws IOException {
- FlushCounter handler = new FlushCounter();
- SortedOplogSet sos = createSoplogSet("merge");
-
- // #1
- sos.put(wrapInt(1), wrapInt(1));
- sos.put(wrapInt(2), wrapInt(1));
- sos.put(wrapInt(3), wrapInt(1));
- sos.put(wrapInt(4), wrapInt(1));
- sos.flush(null, handler);
-
- // #2
- sos.put(wrapInt(2), wrapInt(1));
- sos.put(wrapInt(4), wrapInt(1));
- sos.put(wrapInt(6), wrapInt(1));
- sos.put(wrapInt(8), wrapInt(1));
- sos.flush(null, handler);
-
- // #3
- sos.put(wrapInt(1), wrapInt(1));
- sos.put(wrapInt(3), wrapInt(1));
- sos.put(wrapInt(5), wrapInt(1));
- sos.put(wrapInt(7), wrapInt(1));
- sos.put(wrapInt(9), wrapInt(1));
- sos.flush(null, handler);
-
- // #4
- sos.put(wrapInt(0), wrapInt(1));
- sos.put(wrapInt(1), wrapInt(1));
- sos.put(wrapInt(4), wrapInt(1));
- sos.put(wrapInt(5), wrapInt(1));
-
- while (!handler.flushes.compareAndSet(3, 0));
-
- // the iteration pattern for this test should be 0-9:
- // 0 1 4 5 sbuffer #4
- // 1 3 5 7 9 soplog #3
- // 2 4 6 8 soplog #2
- // 1 2 3 4 soplog #1
- List<Integer> result = new ArrayList<Integer>();
- SortedIterator<ByteBuffer> iter = sos.scan();
- try {
- while (iter.hasNext()) {
- ByteBuffer key = iter.next();
- ByteBuffer val = iter.value();
- assertEquals(wrapInt(1), val);
-
- result.add(key.getInt());
- }
- } finally {
- iter.close();
- }
-
- sos.close();
-
- assertEquals(10, result.size());
- for (int i = 0; i < 10; i++) {
- assertEquals(i, result.get(i).intValue());
- }
- }
-
- @Override
- protected SortedReader<ByteBuffer> createReader(NavigableMap<byte[], byte[]> data)
- throws IOException {
- set = createSoplogSet("test");
-
- int i = 0;
- int flushes = 0;
- FlushCounter fc = new FlushCounter();
-
- for (Entry<byte[], byte[]> entry : data.entrySet()) {
- set.put(entry.getKey(), entry.getValue());
- if (i++ % 13 == 0) {
- flushes++;
- set.flush(null, fc);
- }
- }
-
- while (!fc.flushes.compareAndSet(flushes, 0));
- return set;
- }
-
- public void testClear() throws IOException {
- set.clear();
- validateEmpty(set);
- }
-
- public void testDestroy() throws IOException {
- set.destroy();
- assertTrue(((SortedOplogSetImpl) set).isClosed());
- try {
- set.scan();
- fail();
- } catch (IllegalStateException e) { }
- }
-
- public void testClearInterruptsFlush() throws Exception {
- FlushCounter handler = new FlushCounter();
- SortedOplogSetImpl sos = prepSoplogSet("clearDuringFlush");
-
- sos.testDelayDuringFlush = new CountDownLatch(1);
- sos.flush(null, handler);
- sos.clear();
-
- flushAndWait(handler, sos);
- validateEmpty(sos);
- assertEquals(2, handler.flushes.get());
- }
-
- public void testClearRepeat() throws Exception {
- int i = 0;
- do {
- testClearInterruptsFlush();
- logger.debug("Test {} is complete", i);
- tearDown();
- setUp();
- } while (i++ < 100);
- }
-
- public void testCloseInterruptsFlush() throws Exception {
- FlushCounter handler = new FlushCounter();
- SortedOplogSetImpl sos = prepSoplogSet("closeDuringFlush");
-
- sos.testDelayDuringFlush = new CountDownLatch(1);
- sos.flush(null, handler);
- sos.close();
-
- assertTrue(sos.isClosed());
- assertEquals(1, handler.flushes.get());
- }
-
- public void testDestroyInterruptsFlush() throws Exception {
- FlushCounter handler = new FlushCounter();
- SortedOplogSetImpl sos = prepSoplogSet("destroyDuringFlush");
-
- sos.testDelayDuringFlush = new CountDownLatch(1);
- sos.flush(null, handler);
- sos.destroy();
-
- assertTrue(sos.isClosed());
- assertEquals(1, handler.flushes.get());
- }
-
- public void testScanAfterClear() throws IOException {
- SortedIterator<ByteBuffer> iter = set.scan();
- set.clear();
- assertFalse(iter.hasNext());
- }
-
- public void testScanAfterClose() throws IOException {
- SortedIterator<ByteBuffer> iter = set.scan();
- set.close();
- assertFalse(iter.hasNext());
- }
-
- public void testEmptyFlush() throws Exception {
- FlushCounter handler = new FlushCounter();
- SortedOplogSet sos = prepSoplogSet("empty");
-
- flushAndWait(handler, sos);
- flushAndWait(handler, sos);
- }
-
- public void testErrorDuringFlush() throws Exception {
- FlushCounter handler = new FlushCounter();
- handler.error.set(true);
-
- SortedOplogSetImpl sos = prepSoplogSet("err");
- sos.testErrorDuringFlush = true;
-
- flushAndWait(handler, sos);
- }
-
- protected void validateEmpty(SortedOplogSet sos) throws IOException {
- assertEquals(0, sos.bufferSize());
- assertEquals(0, sos.unflushedSize());
-
- SortedIterator<ByteBuffer> iter = sos.scan();
- assertFalse(iter.hasNext());
- iter.close();
- sos.close();
- }
-
- protected SortedOplogSetImpl prepSoplogSet(String name) throws IOException {
- SortedOplogSetImpl sos = createSoplogSet(name);
-
- sos.put(wrapInt(1), wrapInt(1));
- sos.put(wrapInt(2), wrapInt(1));
- sos.put(wrapInt(3), wrapInt(1));
- sos.put(wrapInt(4), wrapInt(1));
-
- return sos;
- }
-
- protected SortedOplogSetImpl createSoplogSet(String name) throws IOException {
- SortedOplogFactory factory = new NoFileSortedOplogFactory(name);
- Compactor compactor = new NonCompactor(name, new File("."));
-
- return new SortedOplogSetImpl(factory, Executors.newSingleThreadExecutor(), compactor);
- }
-
- protected void flushAndWait(FlushCounter handler, SortedOplogSet sos)
- throws InterruptedException, IOException {
- sos.flush(null, handler);
- while (sos.unflushedSize() > 0) {
- Thread.sleep(1000);
- }
- }
-
- protected static class FlushCounter implements FlushHandler {
- private final AtomicInteger flushes = new AtomicInteger(0);
- private final AtomicBoolean error = new AtomicBoolean(false);
-
- @Override
- public void complete() {
- logger.debug("Flush complete! {}", this);
- assertFalse(error.get());
- flushes.incrementAndGet();
- }
-
- @Override
- public void error(Throwable t) {
- if (!error.get()) {
- t.printStackTrace();
- fail(t.getMessage());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReaderTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReaderTestCase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReaderTestCase.java
deleted file mode 100644
index b41e6ba..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReaderTestCase.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import junit.framework.TestCase;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedStatistics;
-
-public abstract class SortedReaderTestCase extends TestCase {
- private NavigableMap<byte[], byte[]> data;
- protected SortedReader<ByteBuffer> reader;
-
- public static void assertEquals(byte[] expected, ByteBuffer actual) {
- assertEquals(expected.length, actual.remaining());
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i], actual.get(actual.position() + i));
- }
- }
-
- public void testComparator() {
- assertNotNull(reader.getComparator());
- }
-
- public void testStatistics() throws IOException {
- SortedStatistics stats = reader.getStatistics();
- try {
- assertEquals(data.size(), stats.keyCount());
- assertTrue(Arrays.equals(data.firstKey(), stats.firstKey()));
- assertTrue(Arrays.equals(data.lastKey(), stats.lastKey()));
-
- int keySize = 0;
- int valSize = 0;
- for (Entry<byte[], byte[]> entry : data.entrySet()) {
- keySize += entry.getKey().length;
- valSize += entry.getValue().length;
- }
-
- double avgKey = keySize / data.size();
- double avgVal = valSize / data.size();
-
- assertEquals(avgVal, stats.avgValueSize());
- assertEquals(avgKey, stats.avgKeySize());
-
- } finally {
- stats.close();
- }
- }
-
- public void testMightContain() throws IOException {
- for (byte[] key : data.keySet()) {
- assertTrue(reader.mightContain(key));
- }
- }
-
- public void testRead() throws IOException {
- for (byte[] key : data.keySet()) {
- assertEquals(data.get(key), reader.read(key));
- }
- }
-
- public void testScan() throws IOException {
- SortedIterator<ByteBuffer> scan = reader.scan();
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.entrySet().iterator();
- doIter(scan, iter);
-
- } finally {
- scan.close();
- }
- }
-
- public void testMultithreadScan() throws Exception {
- int threads = 10;
- ExecutorService exec = Executors.newFixedThreadPool(threads);
- List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
- for (int i = 0; i < threads; i++) {
- tasks.add(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- testScan();
- return true;
- }
- });
- }
-
- int i = 0;
- while (i++ < 1000) {
- for (Future<Boolean> ft : exec.invokeAll(tasks)) {
- assertTrue(ft.get());
- }
- }
- }
-
- public void testScanReverse() throws IOException {
- SortedIterator<ByteBuffer> scan = reader.withAscending(false).scan();
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.descendingMap().entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
- }
-
- public void testHead() throws IOException {
- byte[] split = wrapInt(50);
- SortedIterator<ByteBuffer> scan = reader.head(split, true);
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.headMap(split, true).entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
-
- scan = reader.head(split, false);
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.headMap(split, false).entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
- }
-
- public void testTail() throws IOException {
- byte[] split = wrapInt(50);
- SortedIterator<ByteBuffer> scan = reader.tail(split, true);
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.tailMap(split, true).entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
-
- scan = reader.tail(split, false);
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.tailMap(split, false).entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
- }
-
- public void testScanWithBounds() throws IOException {
- byte[] lhs = wrapInt(10);
- byte[] rhs = wrapInt(90);
-
- // [lhs,rhs)
- SortedIterator<ByteBuffer> scan = reader.scan(lhs, rhs);
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.subMap(lhs, rhs).entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
-
- // (lhs,rhs)
- scan = reader.scan(lhs, false, rhs, false);
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.subMap(lhs, false, rhs, false).entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
-
- // [lhs,rhs]
- scan = reader.scan(lhs, true, rhs, true);
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.subMap(lhs, true, rhs, true).entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
- }
-
- public void testReverseScanWithBounds() throws IOException {
- data = data.descendingMap();
- byte[] rhs = wrapInt(10);
- byte[] lhs = wrapInt(90);
-
- SortedReader<ByteBuffer> rev = reader.withAscending(false);
-
- // [rhs,lhs)
- SortedIterator<ByteBuffer> scan = rev.scan(lhs, rhs);
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.subMap(lhs, rhs).entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
-
- // (rhs,lhs)
- scan = rev.scan(lhs, false, rhs, false);
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.subMap(lhs, false, rhs, false).entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
-
- // [rhs,lhs]
- scan = rev.scan(lhs, true, rhs, true);
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.subMap(lhs, true, rhs, true).entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
- }
-
- public void testScanEquality() throws IOException {
- byte[] val = wrapInt(10);
-
- // [val,val]
- SortedIterator<ByteBuffer> scan = reader.scan(val);
- try {
- Iterator<Entry<byte[], byte[]>> iter = data.subMap(val, true, val, true).entrySet().iterator();
- doIter(scan, iter);
- } finally {
- scan.close();
- }
- }
-
- public static byte[] wrapInt(int n) {
- ByteBuffer buf = (ByteBuffer) ByteBuffer.allocate(4).putInt(n).flip();
- return buf.array();
- }
-
- public static File[] getSoplogsToDelete() {
- return new File(".").listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.endsWith("soplog") || name.endsWith("crc");
- }
- });
- }
-
- private void doIter(SortedIterator<ByteBuffer> scan, Iterator<Entry<byte[], byte[]>> iter) {
- while (scan.hasNext() || iter.hasNext()) {
- Entry<byte[], byte[]> expected = iter.next();
- assertEquals(expected.getKey(), scan.next());
- assertEquals(expected.getValue(), scan.value());
- }
- }
-
- @Override
- protected final void setUp() throws IOException {
- data = new TreeMap<byte[], byte[]>(new ByteComparator());
-
- for (int i = 0; i < 100; i++) {
- data.put(wrapInt(i), wrapInt(i));
- }
- reader = createReader(data);
- }
-
- @Override
- protected void tearDown() throws IOException {
- reader.close();
- for (File f : getSoplogsToDelete()) {
- f.delete();
- }
- }
-
- protected abstract SortedReader<ByteBuffer> createReader(NavigableMap<byte[], byte[]> data)
- throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplogJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplogJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplogJUnitTest.java
deleted file mode 100644
index b6813d7..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/soplog/nofile/NoFileSortedOplogJUnitTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.nofile;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogWriter;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReaderTestCase;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class NoFileSortedOplogJUnitTest extends SortedReaderTestCase {
- private NoFileSortedOplog mfile;
-
- @Override
- protected SortedReader<ByteBuffer> createReader(NavigableMap<byte[], byte[]> data) throws IOException {
- mfile = new NoFileSortedOplog(new SortedOplogConfiguration("nofile"));
-
- SortedOplogWriter wtr = mfile.createWriter();
- for (Entry<byte[], byte[]> entry : data.entrySet()) {
- wtr.append(entry.getKey(), entry.getValue());
- }
- wtr.close(null);
-
- return mfile.createReader();
- }
-}
[2/3] incubator-geode git commit: GEODE-544: Removes soplog code and
tests
Posted by ab...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java
deleted file mode 100644
index 73175e7..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import com.gemstone.gemfire.internal.DSCODE;
-import com.gemstone.gemfire.internal.cache.EntryBits;
-
-/**
- * Defines serialized tokens for soplogs.
- */
-public enum SoplogToken {
-
- /** indicates the serialized value is a wildcard compares equal to any other key */
- WILDCARD( DSCODE.WILDCARD ),
-
- /** indicates the serialized value is a tombstone of a deleted key */
- TOMBSTONE( EntryBits.setTombstone((byte)0, true) ),
-
- /** indicates the serialized value is a invalid token*/
- INVALID( EntryBits.setInvalid((byte)0, true) ),
-
- /** indicates the serialized tombstone has been garbage collected*/
- REMOVED_PHASE2( EntryBits.setLocalInvalid((byte)0, true) ),
-
- /** indicates the value is serialized */
- SERIALIZED( EntryBits.setSerialized((byte)0, true) );
-
- /** the serialized form of the token */
- private final byte val;
-
- private SoplogToken(byte val) {
- this.val = val;
- }
-
- @Override
- public String toString() {
- return super.toString()+" byte:"+val;
- }
-
- /**
- * Returns the serialized form of the token.
- * @return the byte
- */
- public byte toByte() {
- return val;
- }
-
- /**
- * Returns true if either of the serialized objects is a wildcard.
- *
- * @param b1 the first object
- * @param off1 the first offset
- * @param b2 the second object
- * @param off2 the second object
- * @return true if a wildcard
- */
- public static boolean isWildcard(byte[] b1, int off1, byte[] b2, int off2) {
- return b1[off1] == DSCODE.WILDCARD || b2[off2] == DSCODE.WILDCARD;
- }
-
- /**
- * Returns true if the serialized object is a tombstone.
- *
- * @param b the magic entry type byte
- * @return true if a tombstone
- */
- public static boolean isTombstone(byte b) {
- return EntryBits.isTombstone(b);
- }
-
- /**
- * Returns true if the serialized object is an invalid token.
- *
- * @param b the magic entry type byte
- * @return true if invalid
- */
- public static boolean isInvalid(byte b) {
- return EntryBits.isInvalid(b);
- }
-
- /**
- * Returns true if the serialized tombstone was garbage collected
- *
- * @param b the magic entry type byte
- * @return true if RemovedPhase2
- */
- public static boolean isRemovedPhase2(byte b) {
- return EntryBits.isLocalInvalid(b);
- }
-
- /**
- * Returns true if the serialized object is not any token
- *
- *@param b the magic entry type byte
- * @return true if not any token
- */
- public static boolean isSerialized(byte b) {
- return EntryBits.isSerialized(b);
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java
deleted file mode 100644
index b301ac5..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * Provides an in-memory buffer to temporarily hold key/value pairs until they
- * can be flushed to disk. Each buffer instance can be optionally associated
- * with a user-specified tag for identification purposes.
- *
- * @param <T> the tag type
- * @author bakera
- */
-public class SortedBuffer<T> extends AbstractSortedReader {
- private static final Logger logger = LogService.getLogger();
-
- /** the tag */
- private final T tag;
-
- /** in-memory sorted key/vaue buffer */
- private final NavigableMap<byte[], byte[]> buffer;
-
- /** the stats */
- private final BufferStats stats;
-
- /** the metadata, set during flush */
- private final EnumMap<Metadata, byte[]> metadata;
-
- /** the command to run (or defer) when the flush is complete */
- private Runnable flushAction;
-
- private final String logPrefix;
-
- public SortedBuffer(SortedOplogConfiguration config, T tag) {
- assert config != null;
- assert tag != null;
-
- this.tag = tag;
-
- buffer = new ConcurrentSkipListMap<byte[], byte[]>(config.getComparator());
- stats = new BufferStats();
- metadata = new EnumMap<Metadata, byte[]>(Metadata.class);
-
- this.logPrefix = "<" + config.getName() + "#" + tag + "> ";
- }
-
- /**
- * Returns the tag associated with the buffer.
- * @return the tag
- */
- public T getTag() {
- return tag;
- }
-
- @Override
- public String toString() {
- return logger.getName() + this.logPrefix;
- }
-
- /**
- * Adds a new value to the buffer.
- * @param key the key
- * @param value the value
- */
- public void put(byte[] key, byte[] value) {
- if (buffer.put(key, value) == null) {
- // ASSUMPTION: updates don't significantly change the value length
- // this lets us optimize statistics calculations
- stats.add(key.length, value.length);
- }
- }
-
- /**
- * Allows sorted iteration over the buffer contents.
- * @return the buffer entries
- */
- public Iterable<Entry<byte[], byte[]>> entries() {
- return buffer.entrySet();
- }
-
- /**
- * Returns the number of entries in the buffer.
- * @return the count
- */
- public int count() {
- return buffer.size();
- }
-
- /**
- * Returns the size of the data in bytes.
- * @return the data size
- */
- public long dataSize() {
- return stats.totalSize();
- }
-
- /**
- * Clears the buffer of all entries.
- */
- public void clear() {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Clearing buffer", this.logPrefix);
- }
-
- buffer.clear();
- stats.clear();
- metadata.clear();
-
- synchronized (this) {
- flushAction = null;
- }
- }
-
- /**
- * Returns true if the flush completion has been deferred.
- * @return true if deferred
- */
- public synchronized boolean isDeferred() {
- return flushAction != null;
- }
-
- /**
- * Defers the flush completion to a later time. This is used to ensure correct
- * ordering of soplogs during parallel flushes.
- *
- * @param action the action to perform when ready
- */
- public synchronized void defer(Runnable action) {
- assert flushAction == null;
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}Deferring flush completion", this.logPrefix);
- }
- flushAction = action;
- }
-
- /**
- * Completes the deferred flush operation.
- */
- public synchronized void complete() {
- assert flushAction != null;
-
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Completing deferred flush operation", this.logPrefix);
- }
- flushAction.run();
-
- } finally {
- flushAction = null;
- }
- }
-
- /**
- * Returns the buffer metadata.
- * @return the metadata
- */
- public synchronized EnumMap<Metadata, byte[]> getMetadata() {
- return metadata;
- }
-
- /**
- * Returns the metadata value for the given key.
- *
- * @param name the metadata name
- * @return the requested metadata
- */
- public synchronized byte[] getMetadata(Metadata name) {
- return metadata.get(name);
- }
-
- /**
- * Sets the metadata for the buffer. This is not available until the buffer
- * is about to be flushed.
- *
- * @param metadata the metadata
- */
- public synchronized void setMetadata(EnumMap<Metadata, byte[]> metadata) {
- if (metadata != null) {
- this.metadata.putAll(metadata);
- }
- }
-
- @Override
- public boolean mightContain(byte[] key) {
- return true;
- }
-
- @Override
- public ByteBuffer read(byte[] key) throws IOException {
- byte[] val = buffer.get(key);
- if (val != null) {
- return ByteBuffer.wrap(val);
- }
- return null;
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive,
- boolean ascending,
- MetadataFilter filter) {
-
- if (filter == null || filter.accept(metadata.get(filter.getName()))) {
- NavigableMap<byte[],byte[]> subset = ascending ? buffer : buffer.descendingMap();
- if (from == null && to == null) {
- // we're good
- } else if (from == null) {
- subset = subset.headMap(to, toInclusive);
- } else if (to == null) {
- subset = subset.tailMap(from, fromInclusive);
- } else {
- subset = subset.subMap(from, fromInclusive, to, toInclusive);
- }
- return new BufferIterator(subset.entrySet().iterator());
- }
- return new BufferIterator(Collections.<byte[], byte[]>emptyMap().entrySet().iterator());
- }
-
- @Override
- public SerializedComparator getComparator() {
- return (SerializedComparator) buffer.comparator();
- }
-
- @Override
- public SortedStatistics getStatistics() {
- return stats;
- }
-
- @Override
- public void close() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Closing buffer", this.logPrefix);
- }
-
- synchronized (this) {
- flushAction = null;
- }
- }
-
- /**
- * Allows sorted iteration over the buffer contents.
- */
- public static class BufferIterator
- extends AbstractKeyValueIterator<ByteBuffer, ByteBuffer>
- implements SortedIterator<ByteBuffer>
- {
- /** the backing iterator */
- private final Iterator<Entry<byte[], byte[]>> entries;
-
- /** the iteration cursor */
- private Entry<byte[], byte[]> current;
-
- public BufferIterator(Iterator<Entry<byte[], byte[]>> iterator) {
- this.entries = iterator;
- }
-
- @Override
- public ByteBuffer key() {
- return ByteBuffer.wrap(current.getKey());
- }
-
- @Override
- public ByteBuffer value() {
- return ByteBuffer.wrap(current.getValue());
- }
-
- @Override
- public void close() {
- }
-
- @Override
- protected boolean step() {
- return (current = entries.hasNext() ? entries.next() : null) != null;
- }
- }
-
- private class BufferStats implements SortedStatistics {
- /** data size */
- private long totalSize;
-
- /** key count */
- private long keys;
-
- /** avg key size */
- private double avgKeySize;
-
- /** avg value size */
- private double avgValueSize;
-
- private synchronized void clear() {
- totalSize = 0;
- keys = 0;
- avgKeySize = 0;
- avgValueSize = 0;
- }
-
- private synchronized void add(int keyLength, int valueLength) {
- totalSize += keyLength + valueLength;
- avgKeySize = (keyLength + keys * avgKeySize) / (keys + 1);
- avgValueSize = (keyLength + keys * avgValueSize) / (keys + 1);
-
- keys++;
- }
-
- @Override
- public synchronized long keyCount() {
- return keys;
- }
-
- @Override
- public byte[] firstKey() {
- return buffer.firstKey();
- }
-
- @Override
- public byte[] lastKey() {
- return buffer.lastKey();
- }
-
- @Override
- public synchronized double avgKeySize() {
- return avgKeySize;
- }
-
- @Override
- public synchronized double avgValueSize() {
- return avgValueSize;
- }
-
- @Override
- public void close() {
- }
-
- public synchronized long totalSize() {
- return totalSize;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java
deleted file mode 100644
index 95fb411..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumMap;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-
-/**
- * Defines the API for reading and writing sorted key/value pairs. The keys
- * are expected to be lexicographically comparable {@code byte[]} arrays.
- *
- * @author bakera
- */
-public interface SortedOplog {
- /**
- * Checks if a key may be present in a set.
- */
- public interface BloomFilter {
- /**
- * Returns true if the bloom filter might contain the supplied key. The
- * nature of the bloom filter is such that false positives are allowed, but
- * false negatives cannot occur.
- *
- * @param key the key to test
- * @return true if the key might be present
- */
- boolean mightContain(byte[] key);
- }
-
- /**
- * Reads key/value pairs from the sorted file.
- */
- public interface SortedOplogReader extends SortedReader<ByteBuffer> {
- /**
- * Returns the bloom filter associated with this reader.
- * @return the bloom filter
- */
- BloomFilter getBloomFilter();
-
- /**
- * Returns the metadata value for the given key.
- *
- * @param name the metadata name
- * @return the requested metadata
- * @throws IOException error reading metadata
- */
- byte[] getMetadata(Metadata name) throws IOException;
-
- /**
- * Returns the file used to persist the soplog contents.
- * @return the file
- */
- File getFile();
-
- /**
- * @return file name
- */
- String getFileName();
-
- /**
- * renames the file to the input name
- *
- * @throws IOException
- */
- void rename(String name) throws IOException;
-
- /**
- * @return the modification timestamp of the file
- * @throws IOException
- */
- long getModificationTimeStamp() throws IOException;
-
- /**
- * Deletes the sorted oplog file
- */
- public void delete() throws IOException;
-
- /**
- * Returns true if the reader is closed.
- * @return true if closed
- */
- boolean isClosed();
- }
-
- /**
- * Writes key/value pairs in a sorted manner. Each entry that is appended
- * must have a key that is greater than or equal to the previous key.
- */
- public interface SortedOplogWriter {
- /**
- * Appends another key and value. The key is expected to be greater than
- * or equal to the last key that was appended.
- *
- * @param key the key
- * @param value the value
- * @throws IOException write error
- */
- void append(ByteBuffer key, ByteBuffer value) throws IOException;
-
- /**
- * Appends another key and value. The key is expected to be greater than
- * or equal to the last key that was appended.
- *
- * @param key the key
- * @param value the value
- * @throws IOException write error
- */
- void append(byte[] key, byte[] value) throws IOException;
-
- /**
- * Closes the file, first writing optional user and system metadata.
- *
- * @param metadata the metadata to include
- * @throws IOException unable to close file
- */
- void close(EnumMap<Metadata, byte[]> metadata) throws IOException;
-
- /**
- * Invoked to close and remove the file to clean up after an error.
- * @throws IOException error closing
- */
- void closeAndDelete() throws IOException;
- }
-
- /**
- * Creates a new sorted reader.
- *
- * @return the reader
- * @throws IOException error creating reader
- */
- SortedOplogReader createReader() throws IOException;
-
- /**
- * Creates a new sorted writer.
- *
- * @return the writer
- * @throws IOException error creating writer
- */
- SortedOplogWriter createWriter() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java
deleted file mode 100644
index a470d7e..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.EnumMap;
-
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.MetadataCompactor;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-
-/**
- * Provides a means to construct a soplog.
- */
-public interface SortedOplogFactory {
- /**
- * Configures a <code>SortedOplog</code>.
- *
- * @author bakera
- */
- public class SortedOplogConfiguration {
- /** the default metadata compactor */
- public static MetadataCompactor DEFAULT_METADATA_COMPACTOR = new MetadataCompactor() {
- @Override
- public byte[] compact(byte[] metadata1, byte[] metadata2) {
- return metadata1;
- }
- };
-
- /**
- * Defines the available checksum algorithms.
- */
- public enum Checksum {
- NONE,
- CRC32
- }
-
- /**
- * Defines the available compression algorithms.
- */
- public enum Compression {
- NONE,
- }
-
- /**
- * Defines the available key encodings.
- */
- public enum KeyEncoding {
- NONE,
- }
-
- /** the soplog name */
- private final String name;
-
- /** the statistics */
- private final SortedOplogStatistics stats;
-
- private final HFileStoreStatistics storeStats;
-
- /** true if bloom filters are enabled */
- private boolean bloom;
-
- /** the soplog block size */
- private int blockSize;
-
- /** the number of bytes for each checksum */
- private int bytesPerChecksum;
-
- /** the checksum type */
- private Checksum checksum;
-
- /** the compression type */
- private Compression compression;
-
- /** the key encoding type */
- private KeyEncoding keyEncoding;
-
- /** the comparator */
- private SerializedComparator comparator;
-
- /** metadata comparers */
- private EnumMap<Metadata, MetadataCompactor> metaCompactors;
-
- private BlockCache blockCache;
-
- private boolean cacheDataBlocksOnRead;
-
- public SortedOplogConfiguration(String name) {
- this(name, null, new SortedOplogStatistics("GridDBRegionStatistics", name), new HFileStoreStatistics("GridDBStoreStatistics", name));
- }
-
- public SortedOplogConfiguration(String name, BlockCache blockCache, SortedOplogStatistics stats, HFileStoreStatistics storeStats) {
- this.name = name;
- this.stats = stats;
-
- // defaults
- bloom = true;
- blockSize = 1 << 16;
- bytesPerChecksum = 1 << 14;
- checksum = Checksum.NONE;
- compression = Compression.NONE;
- keyEncoding = KeyEncoding.NONE;
- comparator = new ByteComparator();
- this.cacheDataBlocksOnRead = true;
- this.storeStats = storeStats;
- this.blockCache = blockCache;
- }
-
- public SortedOplogConfiguration setBloomFilterEnabled(boolean enabled) {
- this.bloom = enabled;
- return this;
- }
-
- public SortedOplogConfiguration setBlockSize(int size) {
- this.blockSize = size;
- return this;
- }
-
- public SortedOplogConfiguration setBytesPerChecksum(int bytes) {
- this.bytesPerChecksum = bytes;
- return this;
- }
-
- public SortedOplogConfiguration setChecksum(Checksum type) {
- this.checksum = type;
- return this;
- }
-
- public SortedOplogConfiguration setCompression(Compression type) {
- this.compression = type;
- return this;
- }
-
- public SortedOplogConfiguration setKeyEncoding(KeyEncoding type) {
- this.keyEncoding = type;
- return this;
- }
-
- public SortedOplogConfiguration setComparator(SerializedComparator comp) {
- this.comparator = comp;
- return this;
- }
-
- public SortedOplogConfiguration addMetadataCompactor(Metadata name, MetadataCompactor compactor) {
- metaCompactors.put(name, compactor);
- return this;
- }
-
- /**
- * Returns the soplog name.
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * Returns the statistics.
- * @return the statistics
- */
- public SortedOplogStatistics getStatistics() {
- return stats;
- }
-
- public HFileStoreStatistics getStoreStatistics() {
- return storeStats;
- }
-
- /**
- * Returns true if the bloom filter is enabled.
- * @return true if enabled
- */
- public boolean isBloomFilterEnabled() {
- return bloom;
- }
-
- /**
- * Returns the block size in bytes.
- * @return the block size
- */
- public int getBlockSize() {
- return blockSize;
- }
-
- /**
- * Returns the number of bytes per checksum.
- * @return the bytes
- */
- public int getBytesPerChecksum() {
- return bytesPerChecksum;
- }
-
- /**
- * Returns the checksum type.
- * @return the checksum
- */
- public Checksum getChecksum() {
- return checksum;
- }
-
- /**
- * Returns the compression type.
- * @return the compression
- */
- public Compression getCompression() {
- return compression;
- }
-
- /**
- * Returns the key encoding type.
- * @return the key encoding
- */
- public KeyEncoding getKeyEncoding() {
- return keyEncoding;
- }
-
- /**
- * Returns the comparator.
- * @return the comparator
- */
- public SerializedComparator getComparator() {
- return comparator;
- }
-
- /**
- * Returns the metadata compactor for the given name.
- * @param name the metadata name
- * @return the compactor
- */
- public MetadataCompactor getMetadataCompactor(Metadata name) {
- MetadataCompactor mc = metaCompactors.get(name);
- if (mc != null) {
- return mc;
- }
- return DEFAULT_METADATA_COMPACTOR;
- }
-
- public BlockCache getBlockCache() {
- return this.blockCache;
- }
-
- public boolean getCacheDataBlocksOnRead() {
- return cacheDataBlocksOnRead ;
- }
- }
-
- /**
- * Returns the configuration.
- * @return the configuration
- */
- SortedOplogConfiguration getConfiguration();
-
- /**
- * Creates a new soplog.
- *
- * @param name the filename
- * @return the soplog
- * @throws IOException error creating soplog
- */
- SortedOplog createSortedOplog(File name) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java
deleted file mode 100644
index 2900229..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumMap;
-
-/**
- * Provides a unified view of the current SBuffer, the unflushed SBuffers, and
- * the existing soplogs.
- *
- * @author bakera
- */
-public interface SortedOplogSet extends SortedReader<ByteBuffer> {
- /**
- * Defines a callback handler for asynchronous operations.
- */
- public interface FlushHandler {
- /**
- * Invoked when the operation completed successfully.
- */
- void complete();
-
- /**
- * Invoked when the operation completed with an error.
- * @param t the error
- */
- void error(Throwable t);
- }
-
- /**
- * Inserts or updates an entry in the current buffer. This invocation may
- * block if the current buffer is full and there are too many outstanding
- * write requests.
- *
- * @param key the key
- * @param value the value
- * @throws IOException
- */
- void put(byte[] key, byte[] value) throws IOException;
-
- /**
- * Returns the size of the current buffer in bytes.
- * @return the buffer size
- */
- long bufferSize();
-
- /**
- * Returns the size of the unflushed buffers in bytes.
- * @return the unflushed size
- */
- long unflushedSize();
-
- /**
- * Requests that the current buffer be flushed to disk. This invocation may
- * block if there are too many outstanding write requests.
- *
- * @param metadata supplemental data to be included in the soplog
- * @param handler the flush completion callback
- * @throws IOException error preparing flush
- */
- void flush(EnumMap<Metadata, byte[]> metadata, FlushHandler handler) throws IOException;
-
- /**
- * Flushes the current buffer and closes the soplog set. Blocks until the flush
- * is completed.
- *
- * @param metadata supplemental data to be included in the soplog
- * @throws IOException error during flush
- */
- void flushAndClose(EnumMap<Metadata, byte[]> metadata) throws IOException;
-
- /**
- * Returns the configured compaction strategy.
- * @return the compactor
- */
- Compactor getCompactor();
-
- /**
- * Clears the current buffer, any existing buffers, and all active soplogs.
- *
- * @throws IOException unable to clear
- */
- void clear() throws IOException;
-
- /**
- * Clears existing and closes the soplog set.
- * @throws IOException unable to destroy
- */
- void destroy() throws IOException;
-
- /**
- * Returns true if the set is closed.
- * @return true if closed
- */
- boolean isClosed();
-
- /**
- * Returns the soplog factory.
- * @return the factory
- */
- SortedOplogFactory getFactory();
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java
deleted file mode 100644
index 2cf1191..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java
+++ /dev/null
@@ -1,780 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogWriter;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.util.AbortableTaskService;
-import com.gemstone.gemfire.internal.util.AbortableTaskService.AbortableTask;
-
-/**
- * Provides a unifies view across a set of sbuffers and soplogs. Updates are
- * made into the current sbuffer. When requested, the current sbuffer will be
- * flushed and subsequent updates will flow into a new sbuffer. All flushes are
- * done on a background thread.
- *
- * @author bakera
- */
-public class SortedOplogSetImpl extends AbstractSortedReader implements SortedOplogSet {
- private static final Logger logger = LogService.getLogger();
-
- /** creates new soplogs */
- private final SortedOplogFactory factory;
-
- /** the background flush thread pool */
- private final AbortableTaskService flusher;
-
- /** the compactor */
- private final Compactor compactor;
-
- /** the current sbuffer */
- private final AtomicReference<SortedBuffer<Integer>> current;
-
- /** the buffer count */
- private final AtomicInteger bufferCount;
-
- /** the unflushed sbuffers */
- private final Deque<SortedBuffer<Integer>> unflushed;
-
- /** the lock for access to unflushed and soplogs */
- private final ReadWriteLock rwlock;
-
- /** test hook for clear/close/destroy during flush */
- volatile CountDownLatch testDelayDuringFlush;
-
- /** test hook to cause IOException during flush */
- volatile boolean testErrorDuringFlush;
-
- private final String logPrefix;
-
- public SortedOplogSetImpl(final SortedOplogFactory factory, Executor exec, Compactor ctor) throws IOException {
- this.factory = factory;
- this.flusher = new AbortableTaskService(exec);
- this.compactor = ctor;
-
- rwlock = new ReentrantReadWriteLock();
- bufferCount = new AtomicInteger(0);
- unflushed = new ArrayDeque<SortedBuffer<Integer>>();
- current = new AtomicReference<SortedBuffer<Integer>>(
- new SortedBuffer<Integer>(factory.getConfiguration(), 0));
-
- this.logPrefix = "<" + factory.getConfiguration().getName() + "> ";
- if (logger.isDebugEnabled()) {
- logger.debug("{}Creating soplog set", this.logPrefix);
- }
- }
-
- @Override
- public boolean mightContain(byte[] key) throws IOException {
- // loops through the following readers:
- // current sbuffer
- // unflushed sbuffers
- // soplogs
- //
- // The loop has been unrolled for efficiency.
- //
- if (getCurrent().mightContain(key)) {
- return true;
- }
-
- // snapshot the sbuffers and soplogs for stable iteration
- List<SortedReader<ByteBuffer>> readers;
- Collection<TrackedReference<SortedOplogReader>> soplogs;
- rwlock.readLock().lock();
- try {
- readers = new ArrayList<SortedReader<ByteBuffer>>(unflushed);
- soplogs = compactor.getActiveReaders(key, key);
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- readers.add(tr.get());
- }
- } finally {
- rwlock.readLock().unlock();
- }
-
- try {
- for (SortedReader<ByteBuffer> rdr : readers) {
- if (rdr.mightContain(key)) {
- return true;
- }
- }
- return false;
- } finally {
- TrackedReference.decrementAll(soplogs);
- }
- }
-
- @Override
- public ByteBuffer read(byte[] key) throws IOException {
- // loops through the following readers:
- // current sbuffer
- // unflushed sbuffers
- // soplogs
- //
- // The loop has been slightly unrolled for efficiency.
- //
- ByteBuffer val = getCurrent().read(key);
- if (val != null) {
- return val;
- }
-
- // snapshot the sbuffers and soplogs for stable iteration
- List<SortedReader<ByteBuffer>> readers;
- Collection<TrackedReference<SortedOplogReader>> soplogs;
- rwlock.readLock().lock();
- try {
- readers = new ArrayList<SortedReader<ByteBuffer>>(unflushed);
- soplogs = compactor.getActiveReaders(key, key);
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- readers.add(tr.get());
- }
- } finally {
- rwlock.readLock().unlock();
- }
-
- try {
- for (SortedReader<ByteBuffer> rdr : readers) {
- if (rdr.mightContain(key)) {
- val = rdr.read(key);
- if (val != null) {
- return val;
- }
- }
- }
- return null;
- } finally {
- TrackedReference.decrementAll(soplogs);
- }
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive,
- boolean ascending,
- MetadataFilter filter) throws IOException {
-
- SerializedComparator sc = factory.getConfiguration().getComparator();
- sc = ascending ? sc : ReversingSerializedComparator.reverse(sc);
-
- List<SortedIterator<ByteBuffer>> scans = new ArrayList<SortedIterator<ByteBuffer>>();
- Collection<TrackedReference<SortedOplogReader>> soplogs;
- rwlock.readLock().lock();
- try {
- scans.add(getCurrent().scan(from, fromInclusive, to, toInclusive, ascending, filter));
- for (SortedBuffer<Integer> sb : unflushed) {
- scans.add(sb.scan(from, fromInclusive, to, toInclusive, ascending, filter));
- }
- soplogs = compactor.getActiveReaders(from, to);
- } finally {
- rwlock.readLock().unlock();
- }
-
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- scans.add(tr.get().scan(from, fromInclusive, to, toInclusive, ascending, filter));
- }
- return new MergedIterator(sc, soplogs, scans);
- }
-
- @Override
- public void put(byte[] key, byte[] value) {
- assert key != null;
- assert value != null;
-
- long start = factory.getConfiguration().getStatistics().getPut().begin();
- getCurrent().put(key, value);
- factory.getConfiguration().getStatistics().getPut().end(value.length, start);
- }
-
- @Override
- public long bufferSize() {
- return getCurrent().dataSize();
- }
-
- @Override
- public long unflushedSize() {
- long size = 0;
- rwlock.readLock().lock();
- try {
- for (SortedBuffer<Integer> sb : unflushed) {
- size += sb.dataSize();
- }
- } finally {
- rwlock.readLock().unlock();
- }
- return size;
- }
-
- @Override
- public void flushAndClose(EnumMap<Metadata, byte[]> metadata) throws IOException {
- final AtomicReference<Throwable> err = new AtomicReference<Throwable>(null);
- flush(metadata, new FlushHandler() {
- @Override public void complete() { }
- @Override public void error(Throwable t) { err.set(t); }
- });
-
- // waits for flush completion
- close();
-
- Throwable t = err.get();
- if (t != null) {
- throw new IOException(t);
- }
- }
-
- @Override
- public void flush(EnumMap<Metadata, byte[]> metadata, FlushHandler handler) {
- assert handler != null;
-
- long start = factory.getConfiguration().getStatistics().getFlush().begin();
-
- // flip to a new buffer
- final SortedBuffer<Integer> sb;
- rwlock.writeLock().lock();
- try {
- if (isClosed()) {
- handler.complete();
- factory.getConfiguration().getStatistics().getFlush().end(0, start);
-
- return;
- }
-
- sb = flipBuffer();
- if (sb.count() == 0) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Skipping flush of empty buffer {}", this.logPrefix, sb);
- }
- handler.complete();
- return;
- }
-
- sb.setMetadata(metadata);
- unflushed.addFirst(sb);
-
- // Note: this is queued while holding the lock to ensure correct ordering
- // on the executor queue. Don't use a bounded queue here or we will block
- // the flush invoker.
- flusher.execute(new FlushTask(handler, sb, start));
-
- } finally {
- rwlock.writeLock().unlock();
- }
- }
-
- @Override
- public void clear() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Clearing soplog set", this.logPrefix);
- }
-
- long start = factory.getConfiguration().getStatistics().getClear().begin();
-
- // acquire lock to ensure consistency with flushes
- rwlock.writeLock().lock();
- try {
- SortedBuffer<Integer> tmp = current.get();
- if (tmp != null) {
- tmp.clear();
- }
-
- flusher.abortAll();
- for (SortedBuffer<Integer> sb : unflushed) {
- sb.clear();
- }
-
- unflushed.clear();
- compactor.clear();
-
- releaseTestDelay();
- flusher.waitForCompletion();
- factory.getConfiguration().getStatistics().getClear().end(start);
-
- } catch (IOException e) {
- factory.getConfiguration().getStatistics().getClear().error(start);
- throw (IOException) e.fillInStackTrace();
-
- } finally {
- rwlock.writeLock().unlock();
- }
- }
-
- @Override
- public void destroy() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Destroying soplog set", this.logPrefix);
- }
-
- long start = factory.getConfiguration().getStatistics().getDestroy().begin();
- try {
- unsetCurrent();
- clear();
- close();
-
- factory.getConfiguration().getStatistics().getDestroy().end(start);
-
- } catch (IOException e) {
- factory.getConfiguration().getStatistics().getDestroy().error(start);
- throw (IOException) e.fillInStackTrace();
- }
- }
-
- @Override
- public void close() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Closing soplog set", this.logPrefix);
- }
-
- unsetCurrent();
- releaseTestDelay();
-
- flusher.waitForCompletion();
- compactor.close();
- }
-
- @Override
- public SerializedComparator getComparator() {
- return factory.getConfiguration().getComparator();
- }
-
- @Override
- public SortedStatistics getStatistics() throws IOException {
- List<SortedStatistics> stats = new ArrayList<SortedStatistics>();
- Collection<TrackedReference<SortedOplogReader>> soplogs;
-
- // snapshot, this is expensive
- rwlock.readLock().lock();
- try {
- stats.add(getCurrent().getStatistics());
- for (SortedBuffer<Integer> sb : unflushed) {
- stats.add(sb.getStatistics());
- }
- soplogs = compactor.getActiveReaders(null, null);
- } finally {
- rwlock.readLock().unlock();
- }
-
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- stats.add(tr.get().getStatistics());
- }
- return new MergedStatistics(stats, soplogs);
- }
-
- @Override
- public Compactor getCompactor() {
- return compactor;
- }
-
- @Override
- public boolean isClosed() {
- return current.get() == null;
- }
-
- @Override
- public SortedOplogFactory getFactory() {
- return factory;
- }
-
- private SortedBuffer<Integer> flipBuffer() {
- final SortedBuffer<Integer> sb;
- sb = getCurrent();
- SortedBuffer<Integer> next = new SortedBuffer<Integer>(
- factory.getConfiguration(),
- bufferCount.incrementAndGet());
-
- current.set(next);
- if (logger.isDebugEnabled()) {
- logger.debug("{}Switching from buffer {} to {}", this.logPrefix, sb, next);
- }
- return sb;
- }
-
- private SortedBuffer<Integer> getCurrent() {
- SortedBuffer<Integer> tmp = current.get();
- if (tmp == null) {
- throw new IllegalStateException("Closed");
- }
- return tmp;
- }
-
- private void unsetCurrent() {
- rwlock.writeLock().lock();
- try {
- SortedBuffer<Integer> tmp = current.getAndSet(null);
- if (tmp != null) {
- tmp.clear();
- }
- } finally {
- rwlock.writeLock().unlock();
- }
- }
-
- private void releaseTestDelay() {
- if (testDelayDuringFlush != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Releasing testDelayDuringFlush", this.logPrefix);
- }
- testDelayDuringFlush.countDown();
- }
- }
-
- private class FlushTask implements AbortableTask {
- private final FlushHandler handler;
- private final SortedBuffer<Integer> buffer;
- private final long start;
-
- public FlushTask(FlushHandler handler, SortedBuffer<Integer> buffer, long start) {
- this.handler = handler;
- this.buffer = buffer;
- this.start = start;
- }
-
- @Override
- public void runOrAbort(final AtomicBoolean aborted) {
- try {
- // First transfer the contents of the buffer to a new soplog.
- final SortedOplog soplog = writeBuffer(buffer, aborted);
-
- // If we are aborted, someone else will cleanup the unflushed queue
- if (soplog == null || !lockOrAbort(aborted)) {
- handler.complete();
- return;
- }
-
- try {
- Runnable action = new Runnable() {
- @Override
- public void run() {
- try {
- compactor.add(soplog);
- compactor.compact(false, null);
-
- unflushed.removeFirstOccurrence(buffer);
-
- // TODO need to invoke this while NOT holding write lock
- handler.complete();
- factory.getConfiguration().getStatistics().getFlush().end(buffer.dataSize(), start);
-
- } catch (Exception e) {
- handleError(e, aborted);
- return;
- }
- }
- };
-
- // Enforce flush ordering for consistency. If the previous buffer flush
- // is incomplete, we defer completion and release the thread to avoid
- // deadlocks.
- if (buffer == unflushed.peekLast()) {
- action.run();
-
- SortedBuffer<Integer> tail = unflushed.peekLast();
- while (tail != null && tail.isDeferred() && !aborted.get()) {
- // TODO need to invoke this while NOT holding write lock
- tail.complete();
- tail = unflushed.peekLast();
- }
- } else {
- buffer.defer(action);
- }
- } finally {
- rwlock.writeLock().unlock();
- }
- } catch (Exception e) {
- handleError(e, aborted);
- }
- }
-
- @Override
- public void abortBeforeRun() {
- handler.complete();
- factory.getConfiguration().getStatistics().getFlush().end(start);
- }
-
- private void handleError(Exception e, AtomicBoolean aborted) {
- if (lockOrAbort(aborted)) {
- try {
- unflushed.removeFirstOccurrence(buffer);
- } finally {
- rwlock.writeLock().unlock();
- }
- }
-
- handler.error(e);
- factory.getConfiguration().getStatistics().getFlush().error(start);
- }
-
- private SortedOplog writeBuffer(SortedBuffer<Integer> sb, AtomicBoolean aborted)
- throws IOException {
- File f = compactor.getFileset().getNextFilename();
- if (logger.isDebugEnabled()) {
- logger.debug("{}Flushing buffer {} to {}", SortedOplogSetImpl.this.logPrefix, sb, f);
- }
-
- SortedOplog so = factory.createSortedOplog(f);
- SortedOplogWriter writer = so.createWriter();
- try {
- if (testErrorDuringFlush) {
- throw new IOException("Flush error due to testErrorDuringFlush=true");
- }
-
- for (Entry<byte[], byte[]> entry : sb.entries()) {
- if (aborted.get()) {
- writer.closeAndDelete();
- return null;
- }
- writer.append(entry.getKey(), entry.getValue());
- }
-
- checkTestDelay();
-
- writer.close(buffer.getMetadata());
- return so;
-
- } catch (IOException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Encountered error while flushing buffer {}", SortedOplogSetImpl.this.logPrefix, sb, e);
- }
-
- writer.closeAndDelete();
- throw e;
- }
- }
-
- private void checkTestDelay() {
- if (testDelayDuringFlush != null) {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Waiting for testDelayDuringFlush", SortedOplogSetImpl.this.logPrefix);
- }
- testDelayDuringFlush.await();
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private boolean lockOrAbort(AtomicBoolean abort) {
- try {
- while (!abort.get()) {
- if (rwlock.writeLock().tryLock(10, TimeUnit.MILLISECONDS)) {
- return true;
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return false;
- }
- }
-
- private class MergedStatistics implements SortedStatistics {
- private final List<SortedStatistics> stats;
- private final Collection<TrackedReference<SortedOplogReader>> soplogs;
-
- public MergedStatistics(List<SortedStatistics> stats, Collection<TrackedReference<SortedOplogReader>> soplogs) {
- this.stats = stats;
- this.soplogs = soplogs;
- }
-
- @Override
- public long keyCount() {
- // TODO we have no way of determining the overall key population
- // just assume no overlap for now
- long keys = 0;
- for (SortedStatistics ss : stats) {
- keys += ss.keyCount();
- }
- return keys;
- }
-
- @Override
- public byte[] firstKey() {
- byte[] first = stats.get(0).firstKey();
- for (int i = 1; i < stats.size(); i++) {
- byte[] tmp = stats.get(i).firstKey();
- if (getComparator().compare(first, tmp) > 0) {
- first = tmp;
- }
- }
- return first;
- }
-
- @Override
- public byte[] lastKey() {
- byte[] last = stats.get(0).lastKey();
- for (int i = 1; i < stats.size(); i++) {
- byte[] tmp = stats.get(i).lastKey();
- if (getComparator().compare(last, tmp) < 0) {
- last = tmp;
- }
- }
- return last;
- }
-
- @Override
- public double avgKeySize() {
- double avg = 0;
- for (SortedStatistics ss : stats) {
- avg += ss.avgKeySize();
- }
- return avg / stats.size();
- }
-
- @Override
- public double avgValueSize() {
- double avg = 0;
- for (SortedStatistics ss : stats) {
- avg += ss.avgValueSize();
- }
- return avg / stats.size();
- }
-
- @Override
- public void close() {
- TrackedReference.decrementAll(soplogs);
- }
- }
-
- /**
- * Provides ordered iteration across a set of sorted data sets.
- */
- public static class MergedIterator
- extends AbstractKeyValueIterator<ByteBuffer, ByteBuffer>
- implements SortedIterator<ByteBuffer>
- {
- /** the comparison operator */
- private final SerializedComparator comparator;
-
- /** the reference counted soplogs */
- private final Collection<TrackedReference<SortedOplogReader>> soplogs;
-
- /** the backing iterators */
- private final List<SortedIterator<ByteBuffer>> iters;
-
- /** the current key */
- private ByteBuffer key;
-
- /** the current value */
- private ByteBuffer value;
-
- public MergedIterator(SerializedComparator comparator,
- Collection<TrackedReference<SortedOplogReader>> soplogs,
- List<SortedIterator<ByteBuffer>> iters) {
- this.comparator = comparator;
- this.soplogs = soplogs;
- this.iters = iters;
-
- // initialize iteration positions
- int i = 0;
- while (i < iters.size()) {
- i = advance(i);
- }
- }
-
- @Override
- public ByteBuffer key() {
- return key;
- }
-
- @Override
- public ByteBuffer value() {
- return value;
- }
-
- @Override
- protected boolean step() {
- if (iters.isEmpty() || readerIsClosed()) {
- return false;
- }
-
- int cursor = 0;
- key = iters.get(cursor).key();
-
- int i = 1;
- while (i < iters.size()) {
- ByteBuffer tmp = iters.get(i).key();
-
- int diff = comparator.compare(tmp.array(), tmp.arrayOffset(), tmp.remaining(),
- key.array(), key.arrayOffset(), key.remaining());
- if (diff < 0) {
- cursor = i++;
- key = tmp;
-
- } else if (diff == 0) {
- i = advance(i);
-
- } else {
- i++;
- }
- }
-
- value = iters.get(cursor).value();
- advance(cursor);
-
- return true;
- }
-
- @Override
- public void close() {
- for (SortedIterator<ByteBuffer> iter : iters) {
- iter.close();
- }
- TrackedReference.decrementAll(soplogs);
- }
-
- private int advance(int idx) {
- // either advance the cursor or remove the iterator
- if (!iters.get(idx).hasNext()) {
- iters.remove(idx).close();
- return idx;
- }
- iters.get(idx).next();
- return idx + 1;
- }
-
- private boolean readerIsClosed() {
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- if (tr.get().isClosed()) {
- return true;
- }
- }
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java
deleted file mode 100644
index eb5154c..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile;
-
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
-
-public class BlockCacheHolder {
- private BlockCache cache;
- private HFileStoreStatistics stats;
-
- public BlockCacheHolder(HFileStoreStatistics stats, BlockCache cache) {
- this.stats = stats;
- this.cache = cache;
- }
-
- public synchronized BlockCache getBlockCache() {
- return cache;
- }
-
- public synchronized HFileStoreStatistics getHFileStoreStats() {
- return stats;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java
deleted file mode 100644
index 56c6960..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java
+++ /dev/null
@@ -1,694 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.AbstractSortedReader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.DelegatingSerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.ReversingSerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedBuffer.BufferIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedStatistics;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.util.Bytes;
-import com.gemstone.gemfire.internal.util.Hex;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
-import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
-import org.apache.hadoop.hbase.util.BloomFilterFactory;
-import org.apache.hadoop.hbase.util.BloomFilterWriter;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.logging.log4j.Logger;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * Provides a soplog backed by an HFile.
- *
- * @author bakera
- */
-public class HFileSortedOplog implements SortedOplog {
- public static final byte[] MAGIC = new byte[] { 0x53, 0x4F, 0x50 };
- public static final byte[] VERSION_1 = new byte[] { 0x1 };
-
- // FileInfo is not visible
- private static final byte[] AVG_KEY_LEN = "hfile.AVG_KEY_LEN".getBytes();
- private static final byte[] AVG_VALUE_LEN = "hfile.AVG_VALUE_LEN".getBytes();
-
- /** a default bloom filter */
- private static final BloomFilter DUMMY_BLOOM = new BloomFilter() {
- @Override
- public boolean mightContain(byte[] key) {
- return true;
- }
- };
-
- static final Configuration hconf;
- private static final FileSystem fs;
-
- static {
- // Leave these HBase properties set to defaults for now
- //
- // hfile.block.cache.size (25% of heap)
- // hbase.hash.type (murmur)
- // hfile.block.index.cacheonwrite (false)
- // hfile.index.block.max.size (128k)
- // hfile.format.version (2)
- // io.storefile.bloom.block.size (128k)
- // hfile.block.bloom.cacheonwrite (false)
- // hbase.rs.cacheblocksonwrite (false)
- // hbase.offheapcache.minblocksize (64k)
- // hbase.offheapcache.percentage (0)
- hconf = new Configuration();
-
- hconf.setBoolean("hbase.metrics.showTableName", true);
- SchemaMetrics.configureGlobally(hconf);
-
- try {
- fs = FileSystem.get(hconf);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
-
- private static enum InternalMetadata {
- /** identifies the soplog as a gemfire file, required */
- GEMFIRE_MAGIC,
-
- /** identifies the soplog version, required */
- VERSION,
-
- /** identifies the statistics data */
- STATISTICS,
-
- /** identifies the names of embedded comparators */
- COMPARATORS;
-
- public byte[] bytes() {
- return ("gemfire." + name()).getBytes();
- }
- }
-
- //logger instance
- private static final Logger logger = LogService.getLogger();
- protected final String logPrefix;
-
- /** the configuration */
- private final SortedOplogConfiguration sopConfig;
-
- /** the hfile cache config */
- private final CacheConfig hcache;
-
- /** the hfile location */
- private Path path;
-
- public HFileSortedOplog(File hfile, SortedOplogConfiguration sopConfig) throws IOException {
- assert hfile != null;
- assert sopConfig != null;
-
- this.sopConfig = sopConfig;
- path = fs.makeQualified(new Path(hfile.toString()));
-
-// hcache = new CacheConfig(hconf, sopConfig.getCacheDataBlocksOnRead(), sopConfig.getBlockCache(),
-// HFileSortedOplogFactory.convertStatistics(sopConfig.getStatistics(), sopConfig.getStoreStatistics()));
- hcache = new CacheConfig(hconf);
- this.logPrefix = "<" + sopConfig.getName() + "> ";
- }
-
- @Override
- public SortedOplogReader createReader() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Creating an HFile reader on " + path, logPrefix);
- }
-
- return new HFileSortedOplogReader();
- }
-
- @Override
- public SortedOplogWriter createWriter() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Creating an HFile writer on " + path, logPrefix);
- }
-
- return new HFileSortedOplogWriter();
- }
-
- SortedOplogConfiguration getConfiguration() {
- return sopConfig;
- }
-
- private class HFileSortedOplogReader extends AbstractSortedReader implements SortedOplogReader {
- private final Reader reader;
- private final BloomFilter bloom;
- private final SortedStatistics stats;
- private volatile boolean closed;
-
- public HFileSortedOplogReader() throws IOException {
- reader = HFile.createReader(fs, path, hcache);
- validate();
-
- stats = new HFileSortedStatistics(reader);
- closed = false;
-
- if (reader.getComparator() instanceof DelegatingSerializedComparator) {
- loadComparators((DelegatingSerializedComparator) reader.getComparator());
- }
-
- DataInput bin = reader.getGeneralBloomFilterMetadata();
- if (bin != null) {
- final org.apache.hadoop.hbase.util.BloomFilter hbloom = BloomFilterFactory.createFromMeta(bin, reader);
- if (reader.getComparator() instanceof DelegatingSerializedComparator) {
- loadComparators((DelegatingSerializedComparator) hbloom.getComparator());
- }
-
- bloom = new BloomFilter() {
- @Override
- public boolean mightContain(byte[] key) {
- assert key != null;
-
- long start = sopConfig.getStatistics().getBloom().begin();
- boolean foundKey = hbloom.contains(key, 0, key.length, null);
- sopConfig.getStatistics().getBloom().end(start);
-
- if (logger.isTraceEnabled()) {
- logger.trace(String.format("{}Bloom check on %s for key %s: %b",
- path, Hex.toHex(key), foundKey), logPrefix);
- }
- return foundKey;
- }
- };
-
- } else {
- bloom = DUMMY_BLOOM;
- }
- }
-
- @Override
- public boolean mightContain(byte[] key) {
- return getBloomFilter().mightContain(key);
- }
-
- @Override
- public ByteBuffer read(byte[] key) throws IOException {
- assert key != null;
-
- if (logger.isTraceEnabled()) {
- logger.trace(String.format("{}Reading key %s from %s", Hex.toHex(key), path), logPrefix);
- }
-
- long start = sopConfig.getStatistics().getRead().begin();
- try {
- HFileScanner seek = reader.getScanner(true, true);
- if (seek.seekTo(key) == 0) {
- ByteBuffer val = seek.getValue();
- sopConfig.getStatistics().getRead().end(val.remaining(), start);
-
- return val;
- }
-
- sopConfig.getStatistics().getRead().end(start);
- sopConfig.getStatistics().getBloom().falsePositive();
- return null;
-
- } catch (IOException e) {
- sopConfig.getStatistics().getRead().error(start);
- throw (IOException) e.fillInStackTrace();
- }
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive,
- boolean ascending,
- MetadataFilter filter) throws IOException {
- if (filter == null || filter.accept(getMetadata(filter.getName()))) {
- SerializedComparator tmp = (SerializedComparator) reader.getComparator();
- tmp = ascending ? tmp : ReversingSerializedComparator.reverse(tmp);
-
-// HFileScanner scan = reader.getScanner(true, false, ascending, false);
- HFileScanner scan = reader.getScanner(true, false, false);
- return new HFileSortedIterator(scan, tmp, from, fromInclusive, to, toInclusive);
- }
- return new BufferIterator(Collections.<byte[], byte[]>emptyMap().entrySet().iterator());
- }
-
- @Override
- public SerializedComparator getComparator() {
- return (SerializedComparator) reader.getComparator();
- }
-
- @Override
- public SortedStatistics getStatistics() {
- return stats;
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
-
- @Override
- public void close() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Closing reader on " + path, logPrefix);
- }
- reader.close();
- closed = true;
- }
-
- @Override
- public BloomFilter getBloomFilter() {
- return bloom;
- }
-
- @Override
- public byte[] getMetadata(Metadata name) throws IOException {
- assert name != null;
-
- return reader.loadFileInfo().get(name.bytes());
- }
-
- @Override
- public File getFile() {
- return new File(path.toUri());
- }
-
- @Override
- public String getFileName() {
- return path.getName();
- }
-
- @Override
- public long getModificationTimeStamp() throws IOException {
- FileStatus[] stats = FSUtils.listStatus(fs, path, null);
- if (stats != null && stats.length == 1) {
- return stats[0].getModificationTime();
- } else {
- return 0;
- }
- }
-
- @Override
- public void rename(String name) throws IOException {
- Path parent = path.getParent();
- Path newPath = new Path(parent, name);
- fs.rename(path, newPath);
- // update path to point to the new path
- path = newPath;
- }
-
- @Override
- public void delete() throws IOException {
- fs.delete(path, false);
- }
-
- @Override
- public String toString() {
- return path.toString();
- }
-
- private byte[] getMetadata(InternalMetadata name) throws IOException {
- return reader.loadFileInfo().get(name.bytes());
- }
-
- private void validate() throws IOException {
- // check magic
- byte[] magic = getMetadata(InternalMetadata.GEMFIRE_MAGIC);
- if (!Arrays.equals(magic, MAGIC)) {
- throw new IOException(LocalizedStrings.Soplog_INVALID_MAGIC.toLocalizedString(Hex.toHex(magic)));
- }
-
- // check version compatibility
- byte[] ver = getMetadata(InternalMetadata.VERSION);
- if (logger.isDebugEnabled()) {
- logger.debug("{}Soplog version is " + Hex.toHex(ver), logPrefix);
- }
-
- if (!Arrays.equals(ver, VERSION_1)) {
- throw new IOException(LocalizedStrings.Soplog_UNRECOGNIZED_VERSION.toLocalizedString(Hex.toHex(ver)));
- }
- }
-
- private void loadComparators(DelegatingSerializedComparator comparator) throws IOException {
- byte[] raw = reader.loadFileInfo().get(InternalMetadata.COMPARATORS.bytes());
- assert raw != null;
-
- DataInput in = new DataInputStream(new ByteArrayInputStream(raw));
- comparator.setComparators(readComparators(in));
- }
-
- private SerializedComparator[] readComparators(DataInput in) throws IOException {
- try {
- SerializedComparator[] comps = new SerializedComparator[in.readInt()];
- assert comps.length > 0;
-
- for (int i = 0; i < comps.length; i++) {
- comps[i] = (SerializedComparator) Class.forName(in.readUTF()).newInstance();
- if (comps[i] instanceof DelegatingSerializedComparator) {
- ((DelegatingSerializedComparator) comps[i]).setComparators(readComparators(in));
- }
- }
- return comps;
-
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- }
-
- private class HFileSortedOplogWriter implements SortedOplogWriter {
- private final Writer writer;
- private final BloomFilterWriter bfw;
-
- public HFileSortedOplogWriter() throws IOException {
- writer = HFile.getWriterFactory(hconf, hcache)
- .withPath(fs, path)
- .withBlockSize(sopConfig.getBlockSize())
- .withBytesPerChecksum(sopConfig.getBytesPerChecksum())
- .withChecksumType(HFileSortedOplogFactory.convertChecksum(sopConfig.getChecksum()))
-// .withComparator(sopConfig.getComparator())
- .withCompression(HFileSortedOplogFactory.convertCompression(sopConfig.getCompression()))
- .withDataBlockEncoder(HFileSortedOplogFactory.convertEncoding(sopConfig.getKeyEncoding()))
- .create();
-
- bfw = sopConfig.isBloomFilterEnabled() ?
-// BloomFilterFactory.createGeneralBloomAtWrite(hconf, hcache, BloomType.ROW,
-// 0, writer, sopConfig.getComparator())
- BloomFilterFactory.createGeneralBloomAtWrite(hconf, hcache, BloomType.ROW,
- 0, writer)
- : null;
- }
-
- @Override
- public void append(byte[] key, byte[] value) throws IOException {
- assert key != null;
- assert value != null;
-
- if (logger.isTraceEnabled()) {
- logger.trace(String.format("{}Appending key %s to %s", Hex.toHex(key), path), logPrefix);
- }
-
- try {
- writer.append(key, value);
- if (bfw != null) {
- bfw.add(key, 0, key.length);
- }
- } catch (IOException e) {
- throw (IOException) e.fillInStackTrace();
- }
- }
-
- @Override
- public void append(ByteBuffer key, ByteBuffer value) throws IOException {
- assert key != null;
- assert value != null;
-
- if (logger.isTraceEnabled()) {
- logger.trace(String.format("{}Appending key %s to %s",
- Hex.toHex(key.array(), key.arrayOffset(), key.remaining()), path), logPrefix);
- }
-
- try {
- byte[] keyBytes = new byte[key.remaining()];
- key.duplicate().get(keyBytes);
- byte[] valueBytes = new byte[value.remaining()];
- value.duplicate().get(valueBytes);
- writer.append(keyBytes, valueBytes);
- if (bfw != null) {
- bfw.add(key.array(), key.arrayOffset(), key.remaining());
- }
- } catch (IOException e) {
- throw (IOException) e.fillInStackTrace();
- }
- }
-
- @Override
- public void close(EnumMap<Metadata, byte[]> metadata) throws IOException {
- if (logger.isTraceEnabled()) {
- logger.debug("{}Finalizing and closing writer on " + path, logPrefix);
- }
-
- if (bfw != null) {
- bfw.compactBloom();
- writer.addGeneralBloomFilter(bfw);
- }
-
- // append system metadata
- writer.appendFileInfo(InternalMetadata.GEMFIRE_MAGIC.bytes(), MAGIC);
- writer.appendFileInfo(InternalMetadata.VERSION.bytes(), VERSION_1);
-
- // append comparator info
-// if (writer.getComparator() instanceof DelegatingSerializedComparator) {
-// ByteArrayOutputStream bos = new ByteArrayOutputStream();
-// DataOutput out = new DataOutputStream(bos);
-//
-// writeComparatorInfo(out, ((DelegatingSerializedComparator) writer.getComparator()).getComparators());
-// writer.appendFileInfo(InternalMetadata.COMPARATORS.bytes(), bos.toByteArray());
-// }
-
- // TODO write statistics data to soplog
- // writer.appendFileInfo(Meta.STATISTICS.toBytes(), null);
-
- // append user metadata
- if (metadata != null) {
- for (Entry<Metadata, byte[]> entry : metadata.entrySet()) {
- writer.appendFileInfo(entry.getKey().name().getBytes(), entry.getValue());
- }
- }
-
- writer.close();
- }
-
- @Override
- public void closeAndDelete() throws IOException {
- if (logger.isTraceEnabled()) {
- logger.debug("{}Closing writer and deleting " + path, logPrefix);
- }
-
- writer.close();
- new File(writer.getPath().toUri()).delete();
- }
-
-// private void writeComparatorInfo(DataOutput out, SerializedComparator[] comparators) throws IOException {
-// out.writeInt(comparators.length);
-// for (SerializedComparator sc : comparators) {
-// out.writeUTF(sc.getClass().getName());
-// if (sc instanceof DelegatingSerializedComparator) {
-// writeComparatorInfo(out, ((DelegatingSerializedComparator) sc).getComparators());
-// }
-// }
-// }
- }
-
- private class HFileSortedIterator implements SortedIterator<ByteBuffer> {
- private final HFileScanner scan;
- private final SerializedComparator comparator;
-
- private final byte[] from;
- private final boolean fromInclusive;
-
- private final byte[] to;
- private final boolean toInclusive;
-
- private final long start;
- private long bytes;
-
- private boolean foundNext;
-
- private ByteBuffer key;
- private ByteBuffer value;
-
- public HFileSortedIterator(HFileScanner scan, SerializedComparator comparator,
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive) throws IOException {
- this.scan = scan;
- this.comparator = comparator;
- this.from = from;
- this.fromInclusive = fromInclusive;
- this.to = to;
- this.toInclusive = toInclusive;
-
- assert from == null
- || to == null
- || comparator.compare(from, 0, from.length, to, 0, to.length) <= 0;
-
- start = sopConfig.getStatistics().getScan().begin();
- foundNext = evalFrom();
- }
-
- @Override
- public ByteBuffer key() {
- return key;
- }
-
- @Override
- public ByteBuffer value() {
- return value;
- }
-
- @Override
- public boolean hasNext() {
- if (!foundNext) {
- foundNext = step();
- }
- return foundNext;
- }
-
- @Override
- public ByteBuffer next() {
- long startNext = sopConfig.getStatistics().getScan().beginIteration();
-
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- foundNext = false;
- key = scan.getKey();
- value = scan.getValue();
-
- int len = key.remaining() + value.remaining();
- bytes += len;
- sopConfig.getStatistics().getScan().endIteration(len, startNext);
-
- return key;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() {
- sopConfig.getStatistics().getScan().end(bytes, start);
- }
-
- private boolean step() {
- try {
- if (!scan.isSeeked()) {
- return false;
-
- } else if (scan.next() && evalTo()) {
- return true;
- }
- } catch (IOException e) {
- throw new HDFSIOException("Error from HDFS during iteration", e);
- }
- return false;
- }
-
- private boolean evalFrom() throws IOException {
- if (from == null) {
- return scan.seekTo() && evalTo();
-
- } else {
- int compare = scan.seekTo(from);
- if (compare < 0) {
- return scan.seekTo() && evalTo();
-
- } else if (compare == 0 && fromInclusive) {
- return true;
-
- } else {
- return step();
- }
- }
- }
-
- private boolean evalTo() throws IOException {
- int compare = -1;
- if (to != null) {
- ByteBuffer key = scan.getKey();
- compare = comparator.compare(
- key.array(), key.arrayOffset(), key.remaining(),
- to, 0, to.length);
- }
-
- return compare < 0 || (compare == 0 && toInclusive);
- }
- }
-
- private static class HFileSortedStatistics implements SortedStatistics {
- private final Reader reader;
- private final int keySize;
- private final int valueSize;
-
- public HFileSortedStatistics(Reader reader) throws IOException {
- this.reader = reader;
-
- byte[] sz = reader.loadFileInfo().get(AVG_KEY_LEN);
- keySize = Bytes.toInt(sz[0], sz[1], sz[2], sz[3]);
-
- sz = reader.loadFileInfo().get(AVG_VALUE_LEN);
- valueSize = Bytes.toInt(sz[0], sz[1], sz[2], sz[3]);
- }
-
- @Override
- public long keyCount() {
- return reader.getEntries();
- }
-
- @Override
- public byte[] firstKey() {
- return reader.getFirstKey();
- }
-
- @Override
- public byte[] lastKey() {
- return reader.getLastKey();
- }
-
- @Override
- public double avgKeySize() {
- return keySize;
- }
-
- @Override
- public double avgValueSize() {
- return valueSize;
- }
-
- @Override
- public void close() {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java
deleted file mode 100644
index 9546fd3..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
-import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
-import org.apache.hadoop.hbase.util.ChecksumType;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.Checksum;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.Compression;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.KeyEncoding;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-
-/**
- * Creates HFile soplogs.
- *
- * @author bakera
- */
-public class HFileSortedOplogFactory implements SortedOplogFactory {
- private final SortedOplogConfiguration config;
-
- public HFileSortedOplogFactory(String name, BlockCache blockCache, SortedOplogStatistics stats, HFileStoreStatistics storeStats) {
- config = new SortedOplogConfiguration(name, blockCache, stats, storeStats);
- }
-
- @Override
- public SortedOplogConfiguration getConfiguration() {
- return config;
- }
-
- @Override
- public SortedOplog createSortedOplog(File name) throws IOException {
- return new HFileSortedOplog(name, config);
- }
-
- public static ChecksumType convertChecksum(Checksum type) {
- switch (type) {
- case NONE: return ChecksumType.NULL;
-
- default:
- case CRC32: return ChecksumType.CRC32;
- }
- }
-
- public static Algorithm convertCompression(Compression type) {
- switch (type) {
- default:
- case NONE: return Algorithm.NONE;
- }
- }
-
- public static HFileDataBlockEncoder convertEncoding(KeyEncoding type) {
- switch (type) {
- default:
- case NONE: return NoOpDataBlockEncoder.INSTANCE;
- }
- }
-}
[3/3] incubator-geode git commit: GEODE-544: Removes soplog code and
tests
Posted by ab...@apache.org.
GEODE-544: Removes soplog code and tests
The "soplog" code was a partial implementation of a concurrent
LSM tree stored on local disk. This is not currently used anywhere
so is being cleaned up. The interfaces used by the HDFS feature
have not been deleted.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f95eb683
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f95eb683
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f95eb683
Branch: refs/heads/feature/GEODE-544
Commit: f95eb6832bc834c4b157553bc43fb1c30631cc4c
Parents: e1eb74e
Author: Anthony Baker <ab...@pivotal.io>
Authored: Thu Nov 12 14:39:48 2015 -0800
Committer: Anthony Baker <ab...@pivotal.io>
Committed: Thu Nov 12 14:39:48 2015 -0800
----------------------------------------------------------------------
.../persistence/soplog/AbstractCompactor.java | 533 -------------
.../soplog/AbstractKeyValueIterator.java | 76 --
.../soplog/AbstractSortedReader.java | 135 ----
.../soplog/ArraySerializedComparator.java | 144 ----
.../cache/persistence/soplog/Compactor.java | 174 -----
.../soplog/CompositeSerializedComparator.java | 57 --
.../soplog/IndexSerializedComparator.java | 127 ---
.../cache/persistence/soplog/LevelTracker.java | 120 ---
.../soplog/LexicographicalComparator.java | 460 -----------
.../cache/persistence/soplog/NonCompactor.java | 110 ---
.../soplog/ReversingSerializedComparator.java | 67 --
.../persistence/soplog/SizeTieredCompactor.java | 198 -----
.../cache/persistence/soplog/SoplogToken.java | 116 ---
.../cache/persistence/soplog/SortedBuffer.java | 367 ---------
.../cache/persistence/soplog/SortedOplog.java | 158 ----
.../persistence/soplog/SortedOplogFactory.java | 278 -------
.../persistence/soplog/SortedOplogSet.java | 118 ---
.../persistence/soplog/SortedOplogSetImpl.java | 780 -------------------
.../soplog/hfile/BlockCacheHolder.java | 39 -
.../soplog/hfile/HFileSortedOplog.java | 694 -----------------
.../soplog/hfile/HFileSortedOplogFactory.java | 80 --
.../soplog/nofile/NoFileSortedOplog.java | 244 ------
.../soplog/nofile/NoFileSortedOplogFactory.java | 41 -
.../cache/persistence/soplog/AppendLog.java | 65 --
.../ArraySerializedComparatorJUnitTest.java | 95 ---
.../CompactionSortedOplogSetTestCase.java | 134 ----
.../persistence/soplog/CompactionTestCase.java | 206 -----
.../persistence/soplog/ComparisonTestCase.java | 77 --
.../soplog/IndexComparatorJUnitTest.java | 79 --
.../LexicographicalComparatorJUnitTest.java | 204 -----
.../soplog/RecoverableSortedOplogSet.java | 221 ------
.../soplog/SizeTieredCompactorJUnitTest.java | 110 ---
.../SizeTieredSortedOplogSetJUnitTest.java | 43 -
.../soplog/SortedBufferJUnitTest.java | 39 -
.../soplog/SortedOplogSetJUnitTest.java | 273 -------
.../soplog/SortedReaderTestCase.java | 295 -------
.../nofile/NoFileSortedOplogJUnitTest.java | 48 --
37 files changed, 7005 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java
deleted file mode 100644
index 0b62313..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java
+++ /dev/null
@@ -1,533 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.EnumMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogWriter;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogSetImpl.MergedIterator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.util.AbortableTaskService;
-import com.gemstone.gemfire.internal.util.AbortableTaskService.AbortableTask;
-
-public abstract class AbstractCompactor<T extends Comparable<T>> implements Compactor {
- protected static final Logger logger = LogService.getLogger();
-
- /** the soplog factory */
- protected final SortedOplogFactory factory;
-
- /** the fileset */
- protected final Fileset<T> fileset;
-
- /** the soplog tracker */
- protected final CompactionTracker<T> tracker;
-
- /** thread for background compaction */
- protected final AbortableTaskService compactor;
-
- /** inactive files waiting to be deleted */
- private final Queue<TrackedReference<SortedOplogReader>> inactive;
-
- /** the soplogs */
- protected final List<Level> levels;
-
- /** provides consistent view of all levels */
- private final ReadWriteLock levelLock;
-
- /** test flag to abort compaction */
- volatile boolean testAbortDuringCompaction;
-
- /** test flag to delay compaction */
- volatile CountDownLatch testDelayDuringCompaction;
-
- protected final String logPrefix;
-
- public AbstractCompactor(SortedOplogFactory factory,
- Fileset<T> fileset, CompactionTracker<T> tracker,
- Executor exec) {
- assert factory != null;
- assert fileset != null;
- assert tracker != null;
- assert exec != null;
-
- this.factory = factory;
- this.fileset = fileset;
- this.tracker = tracker;
-
- compactor = new AbortableTaskService(exec);
- inactive = new ConcurrentLinkedQueue<TrackedReference<SortedOplogReader>>();
-
- levelLock = new ReentrantReadWriteLock();
- levels = new ArrayList<Level>();
-
- this.logPrefix = "<" + factory.getConfiguration().getName() + "> ";
- }
-
- @Override
- public final void add(SortedOplog soplog) throws IOException {
- levels.get(0).add(soplog);
- }
-
- @Override
- public final boolean compact() throws IOException {
- final CountDownLatch done = new CountDownLatch(1);
- final AtomicReference<Object> result = new AtomicReference<Object>(null);
-
- compact(true, new CompactionHandler() {
- @Override
- public void complete(boolean compacted) {
- result.set(compacted);
- done.countDown();
- }
-
- @Override
- public void failed(Throwable ex) {
- result.set(ex);
- done.countDown();
- }
- });
-
- try {
- done.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException();
- }
-
- Object val = result.get();
- if (val instanceof Throwable) {
- throw new IOException((Throwable) val);
- }
-
- assert val != null;
- return (Boolean) val;
- }
-
- @Override
- public final void compact(final boolean force, final CompactionHandler ch) {
- // TODO implement force=true, results in a single soplog
- AbortableTask task = new AbortableTask() {
- @Override
- public void runOrAbort(AtomicBoolean aborted) {
- final boolean isDebugEnabled = logger.isDebugEnabled();
- if (isDebugEnabled) {
- logger.debug("{}Beginning compaction", AbstractCompactor.this.logPrefix);
- }
-
- // TODO could do this in one go instead of level-by-level
- try {
- boolean compacted = false;
- for (Level level : levels) {
- if (aborted.get()) {
- if (isDebugEnabled) {
- logger.debug("{}Aborting compaction", AbstractCompactor.this.logPrefix);
- }
- break;
- }
-
- checkTestDelay();
- if (force || level.needsCompaction()) {
- if (isDebugEnabled) {
- logger.debug("{}Compacting level {}", AbstractCompactor.this.logPrefix, level);
- }
-
- long start = factory.getConfiguration().getStatistics().getMinorCompaction().begin();
- try {
- compacted |= level.compact(aborted);
- factory.getConfiguration().getStatistics().getMinorCompaction().end(start);
-
- } catch (IOException e) {
- factory.getConfiguration().getStatistics().getMinorCompaction().error(start);
- }
- }
- }
-
- cleanupInactive();
- if (ch != null) {
- if (isDebugEnabled) {
- logger.debug("{}Completed compaction", AbstractCompactor.this.logPrefix);
- }
- ch.complete(compacted);
- }
- } catch (Exception e) {
- if (isDebugEnabled) {
- logger.debug("{}Encountered an error during compaction", AbstractCompactor.this.logPrefix, e);
- }
- if (ch != null) {
- ch.failed(e);
- }
- }
- }
-
- @Override
- public void abortBeforeRun() {
- if (ch != null) {
- ch.complete(false);
- }
- }
- };
- compactor.execute(task);
- }
-
- @Override
- public final CompactionTracker<?> getTracker() {
- return tracker;
- }
-
- @Override
- public final Fileset<?> getFileset() {
- return fileset;
- }
-
- @Override
- public final Collection<TrackedReference<SortedOplogReader>> getActiveReaders(
- byte[] start, byte[] end) {
-
- // need to coordinate with clear() so we can get a consistent snapshot
- // across levels
- levelLock.readLock().lock();
- try {
- // TODO this seems very garbage-y
- List<TrackedReference<SortedOplogReader>> soplogs = new ArrayList<TrackedReference<SortedOplogReader>>();
- for (Level level : levels) {
- soplogs.addAll(level.getSnapshot(start, end));
- }
- return soplogs;
- } finally {
- levelLock.readLock().unlock();
- }
- }
-
- @Override
- public final void clear() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Clearing compactor", this.logPrefix);
- }
-
- compactor.abortAll();
- releaseTestDelay();
- compactor.waitForCompletion();
-
- levelLock.writeLock().lock();
- try {
- for (Level l : levels) {
- l.clear();
- }
- } finally {
- levelLock.writeLock().unlock();
- }
-
- cleanupInactive();
- }
-
- @Override
- public final void close() throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Closing compactor", this.logPrefix);
- }
-
- compactor.abortAll();
- releaseTestDelay();
- compactor.waitForCompletion();
-
- levelLock.writeLock().lock();
- try {
- for (Level l : levels) {
- l.close();
- }
- } finally {
- levelLock.writeLock().unlock();
- }
-
- TrackedReference<SortedOplogReader> tr;
- while ((tr = inactive.poll()) != null) {
- deleteInactive(tr);
- }
- inactive.clear();
- }
-
- /**
- * Creates a new soplog by merging the supplied soplog readers.
- *
- * @param readers the readers to merge
- * @param collect true if deleted entries should be removed
- * @return the merged soplog
- *
- * @throws IOException error during merge operation
- */
- protected SortedOplog merge(
- Collection<TrackedReference<SortedOplogReader>> readers,
- boolean collect,
- AtomicBoolean aborted) throws IOException {
-
- SerializedComparator sc = null;
- List<SortedIterator<ByteBuffer>> iters = new ArrayList<SortedIterator<ByteBuffer>>();
- for (TrackedReference<SortedOplogReader> tr : readers) {
- iters.add(tr.get().scan());
- sc = tr.get().getComparator();
- }
-
- SortedIterator<ByteBuffer> scan = new MergedIterator(sc, readers, iters);
- try {
- if (!scan.hasNext()) {
- checkAbort(aborted);
- if (logger.isDebugEnabled()) {
- logger.debug("{}No entries left after compaction with readers {} ", this.logPrefix, readers);
- }
- return null;
- }
-
- File f = fileset.getNextFilename();
- if (logger.isDebugEnabled()) {
- logger.debug("{}Compacting soplogs {} into {}", this.logPrefix, readers, f);
- }
-
- if (testAbortDuringCompaction) {
- aborted.set(true);
- }
-
- SortedOplog soplog = factory.createSortedOplog(f);
- SortedOplogWriter wtr = soplog.createWriter();
- try {
- while (scan.hasNext()) {
- checkAbort(aborted);
- scan.next();
- if (!(collect && isDeleted(scan.value()))) {
- wtr.append(scan.key(), scan.value());
- }
- }
-
- EnumMap<Metadata, byte[]> metadata = mergeMetadata(readers);
- wtr.close(metadata);
- return soplog;
-
- } catch (IOException e) {
- wtr.closeAndDelete();
- throw e;
- }
- } finally {
- scan.close();
- }
- }
-
- protected EnumMap<Metadata, byte[]> mergeMetadata(
- Collection<TrackedReference<SortedOplogReader>> readers)
- throws IOException {
- // merge the metadata into the compacted file
- EnumMap<Metadata, byte[]> metadata = new EnumMap<Metadata, byte[]>(Metadata.class);
- for (Metadata meta : Metadata.values()) {
- byte[] val = null;
- for (TrackedReference<SortedOplogReader> tr : readers) {
- byte[] tmp = tr.get().getMetadata(meta);
- if (val == null) {
- val = tmp;
-
- } else if (tmp != null) {
- val = factory.getConfiguration().getMetadataCompactor(meta).compact(val, tmp);
- }
- }
- if (val != null) {
- metadata.put(meta, val);
- }
- }
- return metadata;
- }
-
- protected void releaseTestDelay() {
- if (testDelayDuringCompaction != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Releasing testDelayDuringCompaction", this.logPrefix);
- }
- testDelayDuringCompaction.countDown();
- }
- }
-
- protected void checkTestDelay() {
- if (testDelayDuringCompaction != null) {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Waiting for testDelayDuringCompaction", this.logPrefix);
- }
- testDelayDuringCompaction.await();
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
-
- /**
- * Returns the number of inactive readers.
- * @return the inactive readers
- */
- protected int countInactiveReaders() {
- return inactive.size();
- }
-
- /**
- * Returns the requested level for testing purposes.
- * @param level the level ordinal
- * @return the level
- */
- protected Level getLevel(int level) {
- return levels.get(level);
- }
-
- protected void cleanupInactive() throws IOException {
- for (Iterator<TrackedReference<SortedOplogReader>> iter = inactive.iterator(); iter.hasNext(); ) {
- TrackedReference<SortedOplogReader> tr = iter.next();
- if (!tr.inUse() && inactive.remove(tr)) {
- deleteInactive(tr);
- }
- }
- }
-
- protected void markAsInactive(Iterable<TrackedReference<SortedOplogReader>> snapshot, T attach) throws IOException {
- final boolean isDebugEnabled = logger.isDebugEnabled();
- for (Iterator<TrackedReference<SortedOplogReader>> iter = snapshot.iterator(); iter.hasNext(); ) {
- TrackedReference<SortedOplogReader> tr = iter.next();
- if (isDebugEnabled) {
- logger.debug("{}Marking {} as inactive", this.logPrefix, tr);
- }
-
- inactive.add(tr);
- tracker.fileRemoved(tr.get().getFile(), attach);
-
- factory.getConfiguration().getStatistics().incActiveFiles(-1);
- factory.getConfiguration().getStatistics().incInactiveFiles(1);
- }
- }
-
- private boolean isDeleted(ByteBuffer value) {
- //first byte determines the value type
- byte valType = value.get(value.position());
- return SoplogToken.isTombstone(valType) || SoplogToken.isRemovedPhase2(valType);
- }
-
- private void checkAbort(AtomicBoolean aborted)
- throws InterruptedIOException {
- if (aborted.get()) {
- throw new InterruptedIOException();
- }
- }
-
- private void deleteInactive(TrackedReference<SortedOplogReader> tr)
- throws IOException {
- tr.get().close();
- if (tr.get().getFile().delete()) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Deleted inactive soplog {}", this.logPrefix, tr.get().getFile());
- }
-
- tracker.fileDeleted(tr.get().getFile());
- factory.getConfiguration().getStatistics().incInactiveFiles(-1);
- }
- }
-
- /**
- * Organizes a set of soplogs for a given level.
- */
- protected static abstract class Level {
- /** the level ordinal position */
- protected final int level;
-
- public Level(int level) {
- this.level = level;
- }
-
- @Override
- public String toString() {
- return String.valueOf(level);
- }
-
- /**
- * Returns true if the level needs compaction.
- * @return true if compaction is needed
- */
- protected abstract boolean needsCompaction();
-
- /**
- * Obtains the current set of active soplogs for this level.
- * @return the soplog snapshot
- */
- protected List<TrackedReference<SortedOplogReader>> getSnapshot() {
- return getSnapshot(null, null);
- }
-
- /**
- * Obtains the current set of active soplogs for this level, optionally
- * bounded by the start and end keys.
- *
- * @param start the start key
- * @param end the end key
- * @return the soplog snapshot
- */
- protected abstract List<TrackedReference<SortedOplogReader>> getSnapshot(byte[] start, byte[] end);
-
- /**
- * Clears the soplogs that match the metadata filter.
- * @throws IOException error during close
- */
- protected abstract void clear() throws IOException;
-
- /**
- * Closes the soplogs managed by this level.
- * @throws IOException error closing soplogs
- */
- protected abstract void close() throws IOException;
-
- /**
- * Adds a new soplog to this level.
- *
- * @param soplog the soplog
- * @throws IOException error creating reader
- */
- protected abstract void add(SortedOplog soplog) throws IOException;
-
- /**
- * Merges the current soplogs into a new soplog and promotes it to the next
- * level. The previous soplogs are marked for deletion.
- *
- * @param aborted true if the compaction should be aborted
- * @throws IOException error unable to perform compaction
- */
- protected abstract boolean compact(AtomicBoolean aborted) throws IOException;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java
deleted file mode 100644
index 1326d5c..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-/**
- * Provides an {@link Iterator} view over a collection of keys and values. The
- * implementor must provide access to the current key/value as well as a means
- * to move to the next pair.
- *
- * @author bakera
- *
- * @param <K> the key type
- * @param <V> the value type
- */
-public abstract class AbstractKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
- /** true if the iterator has been advanced to the next element */
- private boolean foundNext = false;
-
- @Override
- public boolean hasNext() {
- if (!foundNext) {
- foundNext = step();
- }
- return foundNext;
- }
-
- @Override
- public K next() {
- if (!foundNext && !step()) {
- throw new NoSuchElementException();
- }
-
- foundNext = false;
- return key();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Returns the key at the current position.
- * @return the key
- */
- public abstract K key();
-
- /**
- * Returns the value at the current position.
- * @return the value
- */
- public abstract V value();
-
- /**
- * Steps the iteration to the next position.
- * @return true if the step succeeded
- */
- protected abstract boolean step();
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java
deleted file mode 100644
index c11e1e0..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-
-/**
- * Provides default behavior for range scans.
- *
- * @author bakera
- */
-public abstract class AbstractSortedReader implements SortedReader<ByteBuffer> {
- @Override
- public final SortedIterator<ByteBuffer> scan() throws IOException {
- return scan(null, true, null, true);
- }
-
- @Override
- public final SortedIterator<ByteBuffer> head(byte[] to, boolean inclusive) throws IOException{
- return scan(null, true, to, inclusive);
- }
-
- @Override
- public final SortedIterator<ByteBuffer> tail(byte[] from, boolean inclusive) throws IOException{
- return scan(from, inclusive, null, true);
- }
-
- @Override
- public final SortedIterator<ByteBuffer> scan(byte[] from, byte[] to) throws IOException{
- return scan(from, true, to, false);
- }
-
- @Override
- public final SortedIterator<ByteBuffer> scan(byte[] equalTo) throws IOException{
- return scan(equalTo, true, equalTo, true);
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(byte[] from, boolean fromInclusive, byte[] to,
- boolean toInclusive) throws IOException{
- return scan(from, fromInclusive, to, toInclusive, true, null);
- }
-
- @Override
- public final SortedReader<ByteBuffer> withAscending(boolean ascending) {
- if (this instanceof DelegateSortedReader) {
- DelegateSortedReader tmp = (DelegateSortedReader) this;
- return new DelegateSortedReader(tmp.delegate, ascending, tmp.filter);
- }
- return new DelegateSortedReader(this, ascending, null);
- }
-
- @Override
- public final SortedReader<ByteBuffer> withFilter(MetadataFilter filter) {
- if (this instanceof DelegateSortedReader) {
- DelegateSortedReader tmp = (DelegateSortedReader) this;
- return new DelegateSortedReader(tmp.delegate, tmp.ascending, filter);
- }
- return new DelegateSortedReader(this, true, filter);
- }
-
- protected class DelegateSortedReader extends AbstractSortedReader {
- /** the embedded reader */
- private final AbstractSortedReader delegate;
-
- /** true if ascending */
- private final boolean ascending;
-
- /** the filter */
- private final MetadataFilter filter;
-
- public DelegateSortedReader(AbstractSortedReader reader, boolean ascending, MetadataFilter filter) {
- this.delegate = reader;
- this.ascending = ascending;
- this.filter = filter;
- }
-
- @Override
- public boolean mightContain(byte[] key) throws IOException {
- return delegate.mightContain(key);
- }
-
- @Override
- public ByteBuffer read(byte[] key) throws IOException {
- return delegate.read(key);
- }
-
- @Override
- public SerializedComparator getComparator() {
- return delegate.getComparator();
- }
-
- @Override
- public SortedStatistics getStatistics() throws IOException {
- return delegate.getStatistics();
- }
-
- @Override
- public void close() throws IOException {
- delegate.close();
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive) throws IOException {
- return scan(from, fromInclusive, to, toInclusive, ascending, filter);
- }
-
- @Override
- public SortedIterator<ByteBuffer> scan(
- byte[] from, boolean fromInclusive,
- byte[] to, boolean toInclusive,
- boolean ascending,
- MetadataFilter filter) throws IOException {
- return delegate.scan(from, fromInclusive, to, toInclusive, ascending, filter);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java
deleted file mode 100644
index 139b3cb..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.nio.ByteBuffer;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.util.Bytes;
-
-/**
- * Provides comparisons of composite keys by comparing each of the constituent
- * parts of the key in order. A subkey will only be evaluated if the preceeding
- * keys have compared as equal.
- * <p>
- * Prior to use, an instance must be configured with the ordered list of
- * comparators to apply.
- * <p>
- * The keys for an N-composite are stored as follows:
- * <pre>
- * | len[0] | key[0] | len[1] | key[1] | ... | len[N-2] | key[N-2] | key[N-1] |
- * </pre>
- * where the key length is stored as a protobuf varint.
- *
- * @author bakera
- */
-public class ArraySerializedComparator implements CompositeSerializedComparator,
-DelegatingSerializedComparator {
-
- /** the comparators */
- private volatile SerializedComparator[] comparators;
-
- /**
- * Injects the comparators to be used on composite keys. The number and order
- * must match the key.
- *
- * @param comparators the comparators
- */
- public void setComparators(SerializedComparator[] comparators) {
- this.comparators = comparators;
- }
-
- @Override
- public int compare(byte[] o1, byte[] o2) {
- return compare(o1, 0, o1.length, o2, 0, o2.length);
- }
-
- @Override
- public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) {
- SerializedComparator[] sc = comparators;
-
- int off1 = o1;
- int off2 = o2;
- for (int i = 0; i < sc.length - 1; i++) {
- int klen1 = Bytes.getVarInt(b1, off1);
- int klen2 = Bytes.getVarInt(b2, off2);
-
- off1 += Bytes.sizeofVarInt(klen1);
- off2 += Bytes.sizeofVarInt(klen2);
-
- if (!SoplogToken.isWildcard(b1, off1, b2, off2)) {
- int diff = sc[i].compare(b1, off1, klen1, b2, off2, klen2);
- if (diff != 0) {
- return diff;
- }
- }
- off1 += klen1;
- off2 += klen2;
- }
-
- if (!SoplogToken.isWildcard(b1, off1, b2, off2)) {
- l1 -= (off1 - o1);
- l2 -= (off2 - o2);
- return sc[sc.length - 1].compare(b1, off1, l1, b2, off2, l2);
- }
- return 0;
- }
-
- @Override
- public SerializedComparator[] getComparators() {
- return comparators;
- }
-
- @Override
- public byte[] createCompositeKey(byte[] key1, byte[] key2) {
- return createCompositeKey(new byte[][] { key1, key2 });
- }
-
- @Override
- public byte[] createCompositeKey(byte[]... keys) {
- assert comparators.length == keys.length;
-
- int size = 0;
- for (int i = 0; i < keys.length - 1; i++) {
- size += keys[i].length + Bytes.sizeofVarInt(keys[i].length);
- }
- size += keys[keys.length - 1].length;
-
- // TODO do we have to do a copy here or can we delay until the disk write?
- int off = 0;
- byte[] buf = new byte[size];
- for (int i = 0; i < keys.length - 1; i++) {
- off = Bytes.putVarInt(keys[i].length, buf, off);
- System.arraycopy(keys[i], 0, buf, off, keys[i].length);
- off += keys[i].length;
- }
- System.arraycopy(keys[keys.length - 1], 0, buf, off, keys[keys.length - 1].length);
- return buf;
- }
-
- @Override
- public ByteBuffer getKey(ByteBuffer key, int ordinal) {
- assert ordinal < comparators.length;
-
- for (int i = 0; i < comparators.length - 1; i++) {
- int klen = Bytes.getVarInt(key);
- if (i == ordinal) {
- ByteBuffer subkey = (ByteBuffer) key.slice().limit(klen);
- key.rewind();
-
- return subkey;
- }
- key.position(key.position() + klen);
- }
-
- ByteBuffer subkey = key.slice();
- key.rewind();
-
- return subkey;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java
deleted file mode 100644
index c80f118..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.SortedMap;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-
-/**
- * Defines a mechanism to track and compact soplogs.
- *
- * @author bakera
- */
-public interface Compactor {
- /**
- * Compares metadata values.
- */
- public interface MetadataCompactor {
- /**
- * Combines two metadata values into a single value. Used during compaction
- * to merge metadata between soplog files.
- *
- * @param metadata1 the first value
- * @param metadata2 the second value
- * @return the combined metadata
- */
- byte[] compact(byte[] metadata1, byte[] metadata2);
- }
-
- /**
- * Provides notification on the status of a compaction.
- */
- public interface CompactionHandler {
- /**
- * Invoked when a compaction operation has completed successfully.
- * @param compacted true if any files were compacted
- */
- void complete(boolean compacted);
-
- /**
- * Invoked when a compaction operation has failed.
- * @param ex the failure
- */
- void failed(Throwable ex);
- }
-
- /**
- * Provides external configuration of file operations for recovering and
- * new file creation.
- *
- * @param <T> the compaction info
- */
- public interface Fileset<T extends Comparable<T>> {
- /**
- * Returns the set of active soplogs.
- * @return the active files
- */
- SortedMap<T, ? extends Iterable<File>> recover();
-
- /**
- * Returns the pathname for the next soplog.
- * @return the soplog filename
- */
- File getNextFilename();
- }
-
- /**
- * Provides a mechanism to coordinate file changes to the levels managed
- * by the compactor.
- *
- * @param T the attachment type
- */
- public interface CompactionTracker<T extends Comparable<T>> {
- /**
- * Invoked when a new file is added.
- * @param f the file
- * @param attach the attachment
- */
- void fileAdded(File f, T attach);
-
- /**
- * Invoked when a file is removed.
- * @param f the file
- * @param attach the attachment
- */
- void fileRemoved(File f, T attach);
-
- /**
- * Invoked when a file is deleted.
- * @param f the attachment
- */
- void fileDeleted(File f);
- }
-
- /**
- * Synchronously invokes the force compaction operation and waits for completion.
- *
- * @return true if any files were compacted
- * @throws IOException error during compaction
- */
- boolean compact() throws IOException;
-
- /**
- * Requests a compaction operation be performed on the soplogs. This invocation
- * may block if there are too many outstanding write requests.
- *
- * @param force if false, compaction will only be performed if necessary
- * @param ch invoked when the compaction is complete, optionally null
- * @throws IOException error during compaction
- */
- void compact(boolean force, CompactionHandler ch);
-
- /**
- * Returns the active readers for the given key range. The caller is responsible
- * for decrementing the use count of each reader when finished.
- *
- * @param start the start key inclusive, or null for beginning
- * @param end the end key inclusive, or null for last
- * @return the readers
- *
- * @see TrackedReference
- */
- Collection<TrackedReference<SortedOplogReader>> getActiveReaders(
- byte[] start, byte[] end);
-
- /**
- * Adds a new soplog to the active set.
- * @param soplog the soplog
- * @throws IOException unable to add soplog
- */
- void add(SortedOplog soplog) throws IOException;
-
- /**
- * Returns the compaction tracker for coordinating changes to the file set.
- * @return the tracker
- */
- CompactionTracker<?> getTracker();
-
- /**
- * Returns the file manager for managing the soplog files.
- * @return the fileset
- */
- Fileset<?> getFileset();
-
- /**
- * Clears the active files managed by the compactor. Files will be marked as
- * inactive and eventually deleted.
- *
- * @throws IOException unable to clear
- */
- void clear() throws IOException;
- /**
- * Closes the compactor.
- * @throws IOException unable to close
- */
- void close() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java
deleted file mode 100644
index 8d9aae5..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.nio.ByteBuffer;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-
-/**
- * Creates and compares composite keys.
- *
- * @author bakera
- */
-public interface CompositeSerializedComparator extends SerializedComparator {
- /**
- * Constructs a composite key consisting of a primary key and a secondary key.
- *
- * @param key1 the primary key
- * @param key2 the secondary key
- * @return the composite key
- */
- public byte[] createCompositeKey(byte[] key1, byte[] key2);
-
- /**
- * Constructs a composite key by combining the supplied keys. The number of
- * keys and their order must match the comparator set.
- * <p>
- * The <code>WILDCARD_KEY</code> token may be used to match all subkeys in the
- * given ordinal position. This is useful when constructing a search key to
- * retrieve all keys for a given primary key, ignoring the remaining subkeys.
- *
- * @param keys the keys, ordered by sort priority
- * @return the composite key
- */
- public byte[] createCompositeKey(byte[]... keys);
-
- /**
- * Returns subkey for the given ordinal position.
- * @param key the composite key
- * @return the subkey
- */
- public ByteBuffer getKey(ByteBuffer key, int ordinal);
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java
deleted file mode 100644
index 816eea0..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.nio.ByteBuffer;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.util.Bytes;
-
-/**
- * Provides a comparator for composite keys of the form (k0, k1). The primary
- * keys are compared lexicographically while the secondary keys are compared
- * bitwise. The key format includes the primary key length to avoid deserialization
- * the secondary key when reading:
- * <pre>
- * | varint | primary key | secondary key |
- * </pre>
- * The key length is encoded using a protobuf-style varint.
- * <p>
- *
- * @author bakera
- */
-public class IndexSerializedComparator implements CompositeSerializedComparator,
-DelegatingSerializedComparator {
-
- private volatile SerializedComparator primary;
- private volatile SerializedComparator secondary;
-
- public IndexSerializedComparator() {
- primary = new LexicographicalComparator();
- secondary = new ByteComparator();
- }
-
- @Override
- public void setComparators(SerializedComparator[] comparators) {
- assert comparators.length == 2;
-
- primary = comparators[0];
- secondary = comparators[1];
- }
-
- @Override
- public SerializedComparator[] getComparators() {
- return new SerializedComparator[] { primary, secondary };
- }
-
- @Override
- public int compare(byte[] o1, byte[] o2) {
- return compare(o1, 0, o1.length, o2, 0, o2.length);
- }
-
- @Override
- public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) {
- int klen1 = Bytes.getVarInt(b1, o1);
- int klen2 = Bytes.getVarInt(b2, o2);
-
- int off1 = o1 + Bytes.sizeofVarInt(klen1);
- int off2 = o2 + Bytes.sizeofVarInt(klen2);
-
- // skip the comparison operation if there is a SearchToken.WILDCARD
- if (!SoplogToken.isWildcard(b1, off1, b2, off2)) {
- int diff = primary.compare(b1, off1, klen1, b2, off2, klen2);
- if (diff != 0) {
- return diff;
- }
- }
- off1 += klen1;
- off2 += klen2;
-
- if (!SoplogToken.isWildcard(b1, off1, b2, off2)) {
- l1 -= (off1 - o1);
- l2 -= (off2 - o2);
- return secondary.compare(b1, off1, l1, b2, off2, l2);
- }
- return 0;
- }
-
- @Override
- public ByteBuffer getKey(ByteBuffer key, int ordinal) {
- assert ordinal < 2;
-
- ByteBuffer subkey;
- int klen = Bytes.getVarInt(key);
- if (ordinal == 0) {
- subkey = (ByteBuffer) key.slice().limit(klen);
-
- } else {
- subkey = ((ByteBuffer) key.position(key.position() + klen)).slice();
- }
-
- key.rewind();
- return subkey;
- }
-
- @Override
- public byte[] createCompositeKey(byte[] key1, byte[] key2) {
- int vlen = Bytes.sizeofVarInt(key1.length);
- byte[] buf = new byte[vlen + key1.length + key2.length];
-
- Bytes.putVarInt(key1.length, buf, 0);
- System.arraycopy(key1, 0, buf, vlen, key1.length);
- System.arraycopy(key2, 0, buf, vlen + key1.length, key2.length);
-
- return buf;
- }
-
- @Override
- public byte[] createCompositeKey(byte[]... keys) {
- assert keys.length == 2;
-
- return createCompositeKey(keys[0], keys[1]);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java
deleted file mode 100644
index a590283..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.LineNumberReader;
-import java.io.Writer;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.CompactionTracker;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.Fileset;
-
-/**
- * A simple, non-robust file tracker for tracking soplogs by level.
- *
- * @author bakera
- */
-public class LevelTracker implements Fileset<Integer>, CompactionTracker<Integer>, Closeable {
- private final String name;
- private final File manifest;
-
- private final SortedMap<Integer, Set<File>> levels;
- private final AtomicLong file;
-
- public LevelTracker(String name, File manifest) throws IOException {
- this.name = name;
- this.manifest = manifest;
- file = new AtomicLong(0);
-
- levels = new TreeMap<Integer, Set<File>>();
- if (!manifest.exists()) {
- return;
- }
-
- LineNumberReader rdr = new LineNumberReader(new FileReader(manifest));
- try {
- String line;
- while ((line = rdr.readLine()) != null) {
- String[] parts = line.split(",");
- int level = Integer.parseInt(parts[0]);
- File f = new File(parts[1]);
- add(f, level);
- }
- } finally {
- rdr.close();
- }
- }
-
- @Override
- public SortedMap<Integer, ? extends Iterable<File>> recover() {
- return levels;
- }
-
- @Override
- public File getNextFilename() {
- return new File(manifest.getParentFile(), name + "-" + System.currentTimeMillis()
- + "-" + file.getAndIncrement() + ".soplog");
- }
-
- @Override
- public void fileAdded(File f, Integer attach) {
- add(f, attach);
- }
-
- @Override
- public void fileRemoved(File f, Integer attach) {
- levels.get(attach).remove(f);
- }
-
- @Override
- public void fileDeleted(File f) {
- }
-
- @Override
- public void close() throws IOException {
- Writer wtr = new FileWriter(manifest);
- try {
- for (Map.Entry<Integer, Set<File>> entry : levels.entrySet()) {
- for (File f : entry.getValue()) {
- wtr.write(entry.getKey() + "," + f + "\n");
- }
- }
- } finally {
- wtr.flush();
- wtr.close();
- }
- }
-
- private void add(File f, int level) {
- Set<File> files = levels.get(level);
- if (files == null) {
- files = new HashSet<File>();
- levels.put(level, files);
- }
- files.add(f);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java
deleted file mode 100644
index 24fba50..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.internal.DSCODE;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-import com.gemstone.gemfire.internal.util.Bytes;
-
-/**
- * Provides type-optimized comparisons for serialized objects. All data is
- * assumed to have been serialized via a call to
- * {@link DataSerializer#writeObject(Object, java.io.DataOutput) }. The following
- * data types have optimized comparisons:
- * <ul>
- * <li>boolean
- * <li>byte
- * <li>short
- * <li>char
- * <li>int
- * <li>long
- * <li>float
- * <li>double
- * <li>String (not {@link DSCODE#HUGE_STRING} or {@link DSCODE#HUGE_STRING_BYTES})
- * </ul>
- * Types that are not listed above fallback to deserialization and comparison
- * via the {@link Comparable} API.
- * <p>
- * Any numeric type may be compared against another numeric type (e.g. double
- * to int).
- * <p>
- * <strong>Any changes to the serialized format may cause version incompatibilities.
- * In addition, the comparison operations will need to be updated.</strong>
- * <p>
- *
- * @author bakera
- */
-public class LexicographicalComparator implements SerializedComparator {
-
- //////////////////////////////////////////////////////////////////////////////
- //
- // constants for any-to-any numeric comparisons
- //
- //////////////////////////////////////////////////////////////////////////////
-
- private static final int BYTE_TO_BYTE = DSCODE.BYTE << 8 | DSCODE.BYTE;
- private static final int BYTE_TO_SHORT = DSCODE.BYTE << 8 | DSCODE.SHORT;
- private static final int BYTE_TO_INT = DSCODE.BYTE << 8 | DSCODE.INTEGER;
- private static final int BYTE_TO_LONG = DSCODE.BYTE << 8 | DSCODE.LONG;
- private static final int BYTE_TO_FLOAT = DSCODE.BYTE << 8 | DSCODE.FLOAT;
- private static final int BYTE_TO_DOUBLE = DSCODE.BYTE << 8 | DSCODE.DOUBLE;
-
- private static final int SHORT_TO_BYTE = DSCODE.SHORT << 8 | DSCODE.BYTE;
- private static final int SHORT_TO_SHORT = DSCODE.SHORT << 8 | DSCODE.SHORT;
- private static final int SHORT_TO_INT = DSCODE.SHORT << 8 | DSCODE.INTEGER;
- private static final int SHORT_TO_LONG = DSCODE.SHORT << 8 | DSCODE.LONG;
- private static final int SHORT_TO_FLOAT = DSCODE.SHORT << 8 | DSCODE.FLOAT;
- private static final int SHORT_TO_DOUBLE = DSCODE.SHORT << 8 | DSCODE.DOUBLE;
-
- private static final int LONG_TO_BYTE = DSCODE.LONG << 8 | DSCODE.BYTE;
- private static final int LONG_TO_SHORT = DSCODE.LONG << 8 | DSCODE.SHORT;
- private static final int LONG_TO_INT = DSCODE.LONG << 8 | DSCODE.INTEGER;
- private static final int LONG_TO_LONG = DSCODE.LONG << 8 | DSCODE.LONG;
- private static final int LONG_TO_FLOAT = DSCODE.LONG << 8 | DSCODE.FLOAT;
- private static final int LONG_TO_DOUBLE = DSCODE.LONG << 8 | DSCODE.DOUBLE;
-
- private static final int INT_TO_BYTE = DSCODE.INTEGER<< 8 | DSCODE.BYTE;
- private static final int INT_TO_SHORT = DSCODE.INTEGER<< 8 | DSCODE.SHORT;
- private static final int INT_TO_INT = DSCODE.INTEGER<< 8 | DSCODE.INTEGER;
- private static final int INT_TO_LONG = DSCODE.INTEGER<< 8 | DSCODE.LONG;
- private static final int INT_TO_FLOAT = DSCODE.INTEGER<< 8 | DSCODE.FLOAT;
- private static final int INT_TO_DOUBLE = DSCODE.INTEGER<< 8 | DSCODE.DOUBLE;
-
- private static final int FLOAT_TO_BYTE = DSCODE.FLOAT << 8 | DSCODE.BYTE;
- private static final int FLOAT_TO_SHORT = DSCODE.FLOAT << 8 | DSCODE.SHORT;
- private static final int FLOAT_TO_INT = DSCODE.FLOAT << 8 | DSCODE.INTEGER;
- private static final int FLOAT_TO_LONG = DSCODE.FLOAT << 8 | DSCODE.LONG;
- private static final int FLOAT_TO_FLOAT = DSCODE.FLOAT << 8 | DSCODE.FLOAT;
- private static final int FLOAT_TO_DOUBLE = DSCODE.FLOAT << 8 | DSCODE.DOUBLE;
-
- private static final int DOUBLE_TO_BYTE = DSCODE.DOUBLE << 8 | DSCODE.BYTE;
- private static final int DOUBLE_TO_SHORT = DSCODE.DOUBLE << 8 | DSCODE.SHORT;
- private static final int DOUBLE_TO_INT = DSCODE.DOUBLE << 8 | DSCODE.INTEGER;
- private static final int DOUBLE_TO_LONG = DSCODE.DOUBLE << 8 | DSCODE.LONG;
- private static final int DOUBLE_TO_FLOAT = DSCODE.DOUBLE << 8 | DSCODE.FLOAT;
- private static final int DOUBLE_TO_DOUBLE = DSCODE.DOUBLE << 8 | DSCODE.DOUBLE;
-
- //////////////////////////////////////////////////////////////////////////////
- //
- // constants for any-to-any string comparisons
- //
- //////////////////////////////////////////////////////////////////////////////
-
- private static final int STRING_TO_STRING = DSCODE.STRING << 8 | DSCODE.STRING;
- private static final int STRING_TO_STRING_BYTES = DSCODE.STRING << 8 | DSCODE.STRING_BYTES;
- private static final int STRING_BYTES_TO_STRING = DSCODE.STRING_BYTES << 8 | DSCODE.STRING;
- private static final int STRING_BYTES_TO_STRING_BYTES = DSCODE.STRING_BYTES << 8 | DSCODE.STRING_BYTES;
-
- @Override
- public int compare(byte[] o1, byte[] o2) {
- return compare(o1, 0, o1.length, o2, 0, o2.length);
- }
-
- @Override
- public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) {
- byte type1 = b1[o1];
- byte type2 = b2[o2];
-
- // optimized comparisons
- if (isString(type1) && isString(type2)) {
- return compareAsString(type1, b1, o1, type2, b2, o2);
-
- } else if (isNumeric(type1) && isNumeric(type2)) {
- return compareAsNumeric(type1, b1, o1, type2, b2, o2);
-
- } else if (type1 == DSCODE.BOOLEAN && type2 == DSCODE.BOOLEAN) {
- return compareAsBoolean(getBoolean(b1, o1), getBoolean(b2, o2));
-
- } else if (type1 == DSCODE.CHARACTER && type2 == DSCODE.CHARACTER) {
- return compareAsChar(getChar(b1, o1), getChar(b2, o2));
-
- } else if (type1 == DSCODE.NULL || type2 == DSCODE.NULL) {
- // null check, assumes NULLs sort last
- return type1 == type2 ? 0 : type1 == DSCODE.NULL ? 1 : -1;
- }
-
- // fallback, will deserialize to Comparable
- return compareAsObject(b1, o1, l1, b2, o2, l2);
- }
-
- private static boolean isNumeric(int type) {
- return type == DSCODE.BYTE
- || type == DSCODE.SHORT
- || type == DSCODE.INTEGER
- || type == DSCODE.LONG
- || type == DSCODE.FLOAT
- || type == DSCODE.DOUBLE;
- }
-
- private static boolean isString(int type) {
- return type == DSCODE.STRING
- || type == DSCODE.STRING_BYTES;
- }
-
- //////////////////////////////////////////////////////////////////////////////
- //
- // type comparisons
- //
- //////////////////////////////////////////////////////////////////////////////
-
- private static int compareAsString(byte type1, byte[] b1, int o1, byte type2, byte[] b2, int o2) {
- // TODO these comparisons do not provide true alphabetical collation
- // support (for example upper case sort before lower case). Need to use a
- // collation key instead of unicode ordinal number comparison
- switch (type1 << 8 | type2) {
- case STRING_TO_STRING:
- return compareAsStringOfUtf(b1, o1, b2, o2);
-
- case STRING_TO_STRING_BYTES:
- return -compareAsStringOfByteToUtf(b2, o2, b1, o1);
-
- case STRING_BYTES_TO_STRING:
- return compareAsStringOfByteToUtf(b1, o1, b2, o2);
-
- case STRING_BYTES_TO_STRING_BYTES:
- return compareAsStringOfByte(b1, o1, b2, o2);
-
- default:
- throw new ClassCastException(String.format("Incomparable types: %d %d", type1, type2));
- }
- }
-
- private static int compareAsNumeric(byte type1, byte[] b1, int o1, byte type2, byte[] b2, int o2) {
- switch (type1 << 8 | type2) {
- case BYTE_TO_BYTE: return compareAsShort (getByte (b1, o1), getByte (b2, o2));
- case BYTE_TO_SHORT: return compareAsShort (getByte (b1, o1), getShort (b2, o2));
- case BYTE_TO_INT: return compareAsInt (getByte (b1, o1), getInt (b2, o2));
- case BYTE_TO_LONG: return compareAsLong (getByte (b1, o1), getLong (b2, o2));
- case BYTE_TO_FLOAT: return compareAsFloat (getByte (b1, o1), getFloat (b2, o2));
- case BYTE_TO_DOUBLE: return compareAsDouble(getByte (b1, o1), getDouble(b2, o2));
-
- case SHORT_TO_BYTE: return compareAsShort (getShort (b1, o1), getByte (b2, o2));
- case SHORT_TO_SHORT: return compareAsShort (getShort (b1, o1), getShort (b2, o2));
- case SHORT_TO_INT: return compareAsInt (getShort (b1, o1), getInt (b2, o2));
- case SHORT_TO_LONG: return compareAsLong (getShort (b1, o1), getLong (b2, o2));
- case SHORT_TO_FLOAT: return compareAsFloat (getShort (b1, o1), getFloat (b2, o2));
- case SHORT_TO_DOUBLE: return compareAsDouble(getShort (b1, o1), getDouble(b2, o2));
-
- case INT_TO_BYTE: return compareAsInt (getInt (b1, o1), getByte (b2, o2));
- case INT_TO_SHORT: return compareAsInt (getInt (b1, o1), getShort (b2, o2));
- case INT_TO_INT: return compareAsInt (getInt (b1, o1), getInt (b2, o2));
- case INT_TO_LONG: return compareAsLong (getInt (b1, o1), getLong (b2, o2));
- case INT_TO_FLOAT: return compareAsFloat (getInt (b1, o1), getFloat (b2, o2));
- case INT_TO_DOUBLE: return compareAsDouble(getInt (b1, o1), getDouble(b2, o2));
-
- case LONG_TO_BYTE: return compareAsLong (getLong (b1, o1), getByte (b2, o2));
- case LONG_TO_SHORT: return compareAsLong (getLong (b1, o1), getShort (b2, o2));
- case LONG_TO_INT: return compareAsLong (getLong (b1, o1), getInt (b2, o2));
- case LONG_TO_LONG: return compareAsLong (getLong (b1, o1), getLong (b2, o2));
- case LONG_TO_FLOAT: return compareAsDouble(getLong (b1, o1), getFloat (b2, o2));
- case LONG_TO_DOUBLE: return compareAsDouble(getLong (b1, o1), getDouble(b2, o2));
-
- case FLOAT_TO_BYTE: return compareAsFloat (getFloat (b1, o1), getByte (b2, o2));
- case FLOAT_TO_SHORT: return compareAsFloat (getFloat (b1, o1), getShort (b2, o2));
- case FLOAT_TO_INT: return compareAsFloat (getFloat (b1, o1), getInt (b2, o2));
- case FLOAT_TO_LONG: return compareAsFloat (getFloat (b1, o1), getLong (b2, o2));
- case FLOAT_TO_FLOAT: return compareAsFloat (getFloat (b1, o1), getFloat (b2, o2));
- case FLOAT_TO_DOUBLE: return compareAsDouble(getFloat (b1, o1), getDouble(b2, o2));
-
- case DOUBLE_TO_BYTE: return compareAsDouble(getDouble(b1, o1), getByte (b2, o2));
- case DOUBLE_TO_SHORT: return compareAsDouble(getDouble(b1, o1), getShort (b2, o2));
- case DOUBLE_TO_INT: return compareAsDouble(getDouble(b1, o1), getInt (b2, o2));
- case DOUBLE_TO_LONG: return compareAsDouble(getDouble(b1, o1), getLong (b2, o2));
- case DOUBLE_TO_FLOAT: return compareAsDouble(getDouble(b1, o1), getFloat (b2, o2));
- case DOUBLE_TO_DOUBLE: return compareAsDouble(getDouble(b1, o1), getDouble(b2, o2));
-
- default:
- throw new ClassCastException(String.format("Incomparable types: %d %d", type1, type2));
- }
- }
-
- private static int compareAsBoolean(boolean b1, boolean b2) {
- return (b1 == b2) ? 0 : (b1 ? 1 : -1);
- }
-
- private static int compareAsShort(short s1, short s2) {
- return s1 - s2;
- }
-
- private static int compareAsChar(char c1, char c2) {
- // TODO non-collating sort
- return c1 - c2;
- }
-
- private static int compareAsInt(long l1, long l2) {
- return (int) (l1 - l2);
- }
-
- private static int compareAsLong(long l1, long l2) {
- return (l1 < l2) ? -1 : ((l1 == l2) ? 0 : 1);
- }
-
- private static int compareAsFloat(float f1, float f2) {
- return Float.compare(f1, f2);
- }
-
- private static int compareAsDouble(double d1, double d2) {
- return Double.compare(d1, d2);
- }
-
- private static int compareAsStringOfByte(byte[] b1, int o1, byte[] b2, int o2) {
- int offset = 3;
- int l1 = Bytes.toUnsignedShort(b1[o1 + 1], b1[o1 + 2]);
- int l2 = Bytes.toUnsignedShort(b2[o1 + 1], b2[o1 + 2]);
-
- assert b1.length >= o1 + offset + l1;
- assert b2.length >= o2 + offset + l2;
-
- int end = o1 + offset + Math.min(l1, l2);
- for (int i = o1 + offset, j = o2 + offset; i < end; i++, j++) {
- int diff = b1[i] - b2[j];
- if (diff != 0) {
- return diff;
- }
- }
- return l1 - l2;
- }
-
- private static int compareAsStringOfUtf(byte[] b1, int o1, byte[] b2, int o2) {
- int offset = 3;
- int l1 = Bytes.toUnsignedShort(b1[o1 + 1], b1[o1 + 2]);
- int l2 = Bytes.toUnsignedShort(b2[o1 + 1], b2[o1 + 2]);
-
- assert b1.length >= o1 + offset + l1;
- assert b2.length >= o2 + offset + l2;
-
- int i = 0;
- int j = 0;
- while (i < l1 && j < l2) {
- final int idx = o1 + offset + i;
- final int ilen = getUtfLength(b1[idx]);
- final char c1 = getUtfChar(b1, idx, ilen);
- i += ilen;
-
- final int jdx = o2 + offset + j;
- final int jlen = getUtfLength(b2[jdx]);
- char c2 = getUtfChar(b2, jdx, jlen);
- j += jlen;
-
- int diff = compareAsChar(c1, c2);
- if (diff != 0) {
- return diff;
- }
- }
- return (l1 - i) - (l2 - j);
- }
-
- private static int compareAsStringOfByteToUtf(byte[] b1, int o1, byte[] b2, int o2) {
- int offset = 3;
- int l1 = Bytes.toUnsignedShort(b1[o1 + 1], b1[o1 + 2]);
- int l2 = Bytes.toUnsignedShort(b2[o1 + 1], b2[o1 + 2]);
-
- assert b1.length >= o1 + offset + l1;
- assert b2.length >= o2 + offset + l2;
-
- int i = 0;
- int j = 0;
- while (i < l1 && j < l2) {
- final int idx = o1 + offset + i;
- final char c1 = (char) b1[idx];
- i++;
-
- final int jdx = o2 + offset + j;
- final int jlen = getUtfLength(b2[jdx]);
- char c2 = getUtfChar(b2, jdx, jlen);
- j += jlen;
-
- int diff = compareAsChar(c1, c2);
- if (diff != 0) {
- return diff;
- }
- }
- return (l1 - i) - (l2 - j);
- }
-
- private static int compareAsObject(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) {
- DataInput in1 = new DataInputStream(new ByteArrayInputStream(b1, o1, l1));
- DataInput in2 = new DataInputStream(new ByteArrayInputStream(b2, o2, l2));
-
- try {
- Comparable<Object> obj1 = DataSerializer.readObject(in1);
- Comparable<Object> obj2 = DataSerializer.readObject(in2);
-
- return obj1.compareTo(obj2);
-
- } catch (Exception e) {
- throw (RuntimeException) new ClassCastException().initCause(e);
- }
- }
-
- //////////////////////////////////////////////////////////////////////////////
- //
- //
- // Get a char from modified UTF8, as defined by DataInput.readUTF().
- //
- //////////////////////////////////////////////////////////////////////////////
-
- private static int getUtfLength(byte b) {
- int c = b & 0xff;
-
- // 0xxxxxxx
- if (c < 0x80) {
- return 1;
-
- // 110xxxxx 10xxxxxx
- } else if (c < 0xe0) {
- return 2;
- }
-
- // 1110xxxx 10xxxxxx 10xxxxxx
- return 3;
- }
-
- private static char getUtfChar(byte[] b, int off, int len) {
- assert b.length >= off + len;
- switch (len) {
- case 1:
- return (char) b[off];
- case 2:
- return getUtf2(b, off);
- case 3:
- default:
- return getUtf3(b, off);
- }
- }
-
- private static char getUtf2(byte[] b, int off) {
- assert b.length >= off + 2;
- assert (b[off] & 0xff) >= 0xc0;
- assert (b[off] & 0xff) < 0xe0;
- assert (b[off + 1] & 0xff) >= 0x80;
-
- return (char) (((b[off] & 0x1f) << 6) | (b[off + 1] & 0x3f));
- }
-
- private static char getUtf3(byte[] b, int off) {
- assert b.length >= off + 3;
- assert (b[off] & 0xff) >= 0xe0;
- assert (b[off + 1] & 0xff) >= 0x80;
- assert (b[off + 2] & 0xff) >= 0x80;
-
- return (char) (((b[off] & 0x0f) << 12) | ((b[off + 1] & 0x3f) << 6) | (b[off + 2] & 0x3f));
- }
-
-
- //////////////////////////////////////////////////////////////////////////////
- //
- // Get a serialized primitive from byte[]; b[0] is the DSCODE.
- //
- //////////////////////////////////////////////////////////////////////////////
-
- private static boolean getBoolean(byte[] b, int off) {
- assert b.length >= off + 2;
- return b[off + 1] != 0;
- }
-
- private static byte getByte(byte[] b, int off) {
- assert b.length >= off + 2;
- return b[off + 1];
- }
-
- private static short getShort(byte[] b, int off) {
- assert b.length >= off + 3;
- return Bytes.toShort(b[off + 1], b[off + 2]);
- }
-
- private static char getChar(byte[] b, int off) {
- assert b.length >= off + 3;
- return Bytes.toChar(b[off + 1], b[off + 2]);
- }
-
- private static int getInt(byte[] b, int off) {
- assert b.length >= off + 5;
- return Bytes.toInt(b[off + 1], b[off + 2], b[off + 3], b[off + 4]);
- }
-
- private static long getLong(byte[] b, int off) {
- assert b.length >= off + 9;
- return Bytes.toLong(b[off + 1], b[off + 2], b[off + 3], b[off + 4],
- b[off + 5], b[off + 6], b[off + 7], b[off + 8]);
- }
-
- private static float getFloat(byte[] b, int off) {
- assert b.length >= off + 5;
- return Float.intBitsToFloat(getInt(b, off));
- }
-
- private static double getDouble(byte[] b, int off) {
- assert b.length >= off + 9;
- return Double.longBitsToDouble(getLong(b, off));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java
deleted file mode 100644
index 697ac18..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-
-/**
- * Provides a compactor that does no compaction, primarily for testing purposes.
- *
- * @author bakera
- */
-public class NonCompactor implements Compactor {
- /** the fileset */
- private final Fileset<Integer> fileset;
-
- /** the current readers */
- private final Deque<TrackedReference<SortedOplogReader>> readers;
-
- public static Fileset<Integer> createFileset(final String name, final File dir) {
- return new Fileset<Integer>() {
- private final AtomicLong file = new AtomicLong(0);
-
- @Override
- public SortedMap<Integer, ? extends Iterable<File>> recover() {
- return new TreeMap<Integer, Iterable<File>>();
- }
-
- @Override
- public File getNextFilename() {
- return new File(dir, name + "-" + System.currentTimeMillis() + "-"
- + file.getAndIncrement() + ".soplog");
- }
- };
- }
- public NonCompactor(String name, File dir) {
- fileset = createFileset(name, dir);
- readers = new ArrayDeque<TrackedReference<SortedOplogReader>>();
- }
-
- @Override
- public boolean compact() throws IOException {
- // liar!
- return true;
- }
-
- @Override
- public void compact(boolean force, CompactionHandler cd) {
- }
-
- @Override
- public synchronized Collection<TrackedReference<SortedOplogReader>> getActiveReaders(
- byte[] start, byte[] end) {
- for (TrackedReference<SortedOplogReader> tr : readers) {
- tr.increment();
- }
- return new ArrayList<TrackedReference<SortedOplogReader>>(readers);
- }
-
- @Override
- public void add(SortedOplog soplog) throws IOException {
- readers.addFirst(new TrackedReference<SortedOplogReader>(soplog.createReader()));
- }
-
- @Override
- public synchronized void clear() throws IOException {
- for (TrackedReference<SortedOplogReader> tr : readers) {
- tr.get().close();
- readers.remove(tr);
- }
- }
-
- @Override
- public synchronized void close() throws IOException {
- clear();
- }
-
- @Override
- public CompactionTracker<Integer> getTracker() {
- return null;
- }
-
- @Override
- public Fileset<Integer> getFileset() {
- return fileset;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java
deleted file mode 100644
index b18919d..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-
-/**
- * Reverses the ordering imposed by the underlying comparator. Use this to
- * change from an ascending to a descending order or vice versa.
- * <p>
- * Prior to use, an instance must be configured with a comparator for delegation
- * of the comparison operations.
- *
- * @author bakera
- */
-public class ReversingSerializedComparator implements DelegatingSerializedComparator {
- private volatile SerializedComparator delegate;
-
- @Override
- public void setComparators(SerializedComparator[] sc) {
- assert sc.length == 0;
- delegate = sc[0];
- }
-
- @Override
- public SerializedComparator[] getComparators() {
- return new SerializedComparator[] { delegate };
- }
-
- @Override
- public int compare(byte[] o1, byte[] o2) {
- return compare(o1, 0, o1.length, o2, 0, o2.length);
- }
-
- @Override
- public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) {
- return delegate.compare(b2, o2, l2, b1, o1, l1);
- }
-
- /**
- * Returns a comparator that reverses the ordering imposed by the supplied
- * comparator.
- *
- * @param sc the original comparator
- * @return the reversed comparator
- */
- public static SerializedComparator reverse(SerializedComparator sc) {
- ReversingSerializedComparator rev = new ReversingSerializedComparator();
- rev.delegate = sc;
-
- return rev;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java
deleted file mode 100644
index 5976ad0..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader;
-
-/**
- * Implements a size-tiered compaction scheme in which the soplogs are organized
- * by levels of increasing size. Each level is limited to a fixed number of
- * files, <code>M</code>. Given an initial size of <code>N</code> the amount of
- * disk space consumed by a level <code>L</code> is <code>M * N^(L+1)</code>.
- * <p>
- * During compaction, this approach will temporarily double the amount of space
- * consumed by the level. Compactions are performed on a background thread.
- * <p>
- * Soplogs that have been compacted will be moved to the inactive list where they
- * will be deleted once they are no longer in use.
- *
- * @author bakera
- */
-public class SizeTieredCompactor extends AbstractCompactor<Integer> {
- /** restricts the number of soplogs per level */
- private final int maxFilesPerLevel;
-
- // TODO consider relaxing the upper bound so the levels are created dynamically
- /** restricts the number of levels; files in maxLevel are not compacted */
- private final int maxLevels;
-
- public SizeTieredCompactor(SortedOplogFactory factory,
- Fileset<Integer> fileset, CompactionTracker<Integer> tracker,
- Executor exec, int maxFilesPerLevel, int maxLevels)
- throws IOException {
- super(factory, fileset, tracker, exec);
-
- assert maxFilesPerLevel > 0;
- assert maxLevels > 0;
-
- this.maxFilesPerLevel = maxFilesPerLevel;
- this.maxLevels = maxLevels;
-
- final boolean isDebugEnabled = logger.isDebugEnabled();
- if (isDebugEnabled) {
- logger.debug("{}Creating size-tiered compactor", super.logPrefix);
- }
-
- for (int i = 0; i < maxLevels; i++) {
- levels.add(new OrderedLevel(i));
- }
-
- for (Map.Entry<Integer, ? extends Iterable<File>> entry : fileset.recover().entrySet()) {
- int level = Math.min(maxLevels - 1, entry.getKey());
- for (File f : entry.getValue()) {
- if (isDebugEnabled) {
- logger.debug("{}Adding {} to level {}", super.logPrefix, f, level);
- }
- levels.get(level).add(factory.createSortedOplog(f));
- }
- }
- }
-
- @Override
- public String toString() {
- return String.format("%s <%d/%d>", factory.getConfiguration().getName(), maxFilesPerLevel, maxLevels);
- }
-
- /**
- * Organizes a set of soplogs for a given level. All operations on the
- * soplogs are synchronized via the instance monitor.
- */
- protected class OrderedLevel extends Level {
- /** the ordered set of soplog readers */
- private final Deque<TrackedReference<SortedOplogReader>> soplogs;
-
- /** true if the level is being compacted */
- private final AtomicBoolean isCompacting;
-
- public OrderedLevel(int level) {
- super(level);
- soplogs = new ArrayDeque<TrackedReference<SortedOplogReader>>(maxFilesPerLevel);
- isCompacting = new AtomicBoolean(false);
- }
-
- @Override
- protected synchronized boolean needsCompaction() {
- // TODO this is safe but overly conservative...we need to allow parallel
- // compaction of a level such that we guarantee completion order and handle
- // errors
- return !isCompacting.get()
- && soplogs.size() >= maxFilesPerLevel
- && level != maxLevels - 1;
- }
-
- @Override
- protected List<TrackedReference<SortedOplogReader>> getSnapshot(byte[] start, byte[] end) {
- // ignoring range limits since keys are stored in overlapping files
- List<TrackedReference<SortedOplogReader>> snap;
- synchronized (this) {
- snap = new ArrayList<TrackedReference<SortedOplogReader>>(soplogs);
- }
-
- for (TrackedReference<SortedOplogReader> tr : snap) {
- tr.increment();
- }
- return snap;
- }
-
- @Override
- protected synchronized void clear() throws IOException {
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- tr.get().close();
- }
- markAsInactive(soplogs, level);
- soplogs.clear();
- }
-
- @Override
- protected synchronized void close() throws IOException {
- for (TrackedReference<SortedOplogReader> tr : soplogs) {
- tr.get().close();
- factory.getConfiguration().getStatistics().incActiveFiles(-1);
- }
- soplogs.clear();
- }
-
- @Override
- protected void add(SortedOplog soplog) throws IOException {
- SortedOplogReader rdr = soplog.createReader();
- synchronized (this) {
- soplogs.addFirst(new TrackedReference<SortedOplogReader>(rdr));
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("{}Added file {} to level {}", SizeTieredCompactor.super.logPrefix, rdr, level);
- }
- tracker.fileAdded(rdr.getFile(), level);
- factory.getConfiguration().getStatistics().incActiveFiles(1);
- }
-
- @Override
- protected boolean compact(AtomicBoolean aborted) throws IOException {
- assert level < maxLevels : "Can't compact level: " + level;
-
- if (!isCompacting.compareAndSet(false, true)) {
- // another thread won so gracefully bow out
- return false;
- }
-
- try {
- List<TrackedReference<SortedOplogReader>> snapshot = getSnapshot(null, null);
- try {
- SortedOplog merged = merge(snapshot, level == maxLevels - 1, aborted);
-
- synchronized (this) {
- if (merged != null) {
- levels.get(Math.min(level + 1, maxLevels - 1)).add(merged);
- }
- markAsInactive(snapshot, level);
- soplogs.removeAll(snapshot);
- }
- } catch (InterruptedIOException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}Aborting compaction of level {}", SizeTieredCompactor.super.logPrefix, level);
- }
- return false;
- }
- return true;
- } finally {
- boolean set = isCompacting.compareAndSet(true, false);
- assert set;
- }
- }
- }
-}