You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/11/07 07:15:48 UTC
[03/10] asterixdb git commit: [ASTERIXDB-2422][STO] Introduce
compressed storage
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
new file mode 100644
index 0000000..3a7e901
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.compression;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An API for block compressor/decompressor.
+ *
+ * Note: Should never allocate any buffer in compress/uncompress operations and it must be stateless to be thread safe.
+ */
+public interface ICompressorDecompressor {
+ /**
+ * Computes the required buffer size for <i>compress()</i>.
+ *
+ * @param uBufferSize
+ * The size of the uncompressed buffer.
+ * @return The required buffer size for compression
+ */
+ int computeCompressedBufferSize(int uBufferSize);
+
+ /**
+ * Compress <i>uBuffer</i> into <i>cBuffer</i>
+ *
+ * @param uBuffer
+ * Uncompressed source buffer
+ * @param cBuffer
+ * Compressed destination buffer
+ * @return Buffer after compression. ({@link ByteBuffer#limit()} is set to the compressed size
+ * @throws HyracksDataException
+ */
+ ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException;
+
+ /**
+ * Uncompress <i>cBuffer</i> into <i>uBuffer</i>
+ *
+ * @param cBuffer
+ * Compressed source buffer
+ * @param uBuffer
+ * Uncompressed destination buffer
+ * @return Buffer after decompression. ({@link ByteBuffer#limit()} is set to the uncompressed size
+ * @throws HyracksDataException
+ * An exception will be thrown if the <i>uBuffer</i> size is not sufficient.
+ */
+ ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException;
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java
new file mode 100644
index 0000000..b813afb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.compression;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.io.IJsonSerializable;
+
+/**
+ * {@link ICompressorDecompressor} factory.
+ *
+ * New factory of this interface must implement two methods as well if the compression is intended for storage:
+ * - {@link IJsonSerializable#toJson(org.apache.hyracks.api.io.IPersistedResourceRegistry)}
+ * - a static method fromJson(IPersistedResourceRegistry registry, JsonNode json)
+ */
+public interface ICompressorDecompressorFactory extends Serializable, IJsonSerializable {
+ /**
+ * Create a compressor/decompressor instance
+ *
+ * @return {@code ICompressorDecompressor}
+ */
+ ICompressorDecompressor createInstance();
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
index 4ded855..8b93d07 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
@@ -27,7 +27,7 @@ import java.util.Date;
* Used to identify a file in the local Node Controller.
* Only used for files which are stored inside an IO device.
*/
-public final class FileReference implements Serializable {
+public class FileReference implements Serializable {
private static final long serialVersionUID = 1L;
private final File file;
private final IODeviceHandle dev;
@@ -90,7 +90,11 @@ public final class FileReference implements Serializable {
}
public FileReference getChild(String name) {
- return new FileReference(dev, path + File.separator + name);
+ return new FileReference(dev, getChildPath(name));
+ }
+
+ public String getChildPath(String name) {
+ return path + File.separator + name;
}
public void register() {
@@ -111,4 +115,8 @@ public final class FileReference implements Serializable {
}
registrationTime = 0;
}
+
+ public boolean isCompressed() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java
index 38162c6..333b373 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java
@@ -46,4 +46,16 @@ public interface IPersistedResourceRegistry {
* @throws HyracksDataException
*/
IJsonSerializable deserialize(JsonNode json) throws HyracksDataException;
+
+ /**
+ * This method must be used for optional fields or newly added fields to ensure back-compatibility
+ *
+ * @param json
+ * @param clazz
+ * @return A class object of the type id in {@code json} if exists
+ * or a class object of type <code>clazz</code> otherwise.
+ * @throws HyracksDataException
+ */
+ IJsonSerializable deserializeOrDefault(JsonNode json, Class<? extends IJsonSerializable> clazz)
+ throws HyracksDataException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index 91acea0..b8cf066 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvid
import org.apache.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import org.apache.hyracks.tests.am.common.LSMTreeOperatorTestHelper;
public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
@@ -46,6 +47,6 @@ public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
NoOpIOOperationCallbackFactory.INSTANCE, pageManagerFactory, getVirtualBufferCacheProvider(),
SynchronousSchedulerProvider.INSTANCE, MERGE_POLICY_FACTORY, MERGE_POLICY_PROPERTIES, DURABLE,
bloomFilterKeyFields, LSMTreeOperatorTestHelper.DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE, true,
- btreefields);
+ btreefields, NoOpCompressorDecompressorFactory.INSTANCE);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 4fc8af9..5d2cd26 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -1043,9 +1043,9 @@ public class BTree extends AbstractTreeIndex {
((IBTreeLeafFrame) leafFrame).setNextLeaf(leafFrontier.pageId);
- queue.put(leafFrontier.page, this);
+ putInQueue(leafFrontier.page);
for (ICachedPage c : pagesToWrite) {
- queue.put(c, this);
+ putInQueue(c);
}
pagesToWrite.clear();
splitKey.setRightPage(leafFrontier.pageId);
@@ -1152,7 +1152,7 @@ public class BTree extends AbstractTreeIndex {
ICachedPage lastLeaf = nodeFrontiers.get(level).page;
int lastLeafPage = nodeFrontiers.get(level).pageId;
lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), nodeFrontiers.get(level).pageId));
- queue.put(lastLeaf, this);
+ putInQueue(lastLeaf);
nodeFrontiers.get(level).page = null;
persistFrontiers(level + 1, lastLeafPage);
return;
@@ -1167,7 +1167,7 @@ public class BTree extends AbstractTreeIndex {
((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage);
int finalPageId = freePageManager.takePage(metaFrame);
frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
- queue.put(frontier.page, this);
+ putInQueue(frontier.page);
frontier.pageId = finalPageId;
persistFrontiers(level + 1, finalPageId);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index 97e7ed7..7d43ed8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -33,6 +33,7 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager {
@@ -221,10 +222,13 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager
}
int finalMetaPage = getMaxPageId(metaFrame) + 1;
confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
+ final ICompressedPageWriter compressedPageWriter = bufferCache.getCompressedPageWriter(fileId);
+ compressedPageWriter.prepareWrite(confiscatedPage);
// WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
// won't be flushed to disk because it won't be dirty until the write latch has been released.
queue.put(confiscatedPage, callback);
bufferCache.finishQueue();
+ compressedPageWriter.endWriting();
metadataPage = getMetadataPageId();
ready = false;
} else if (confiscatedPage != null) {
@@ -249,7 +253,8 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager
}
int pages = bufferCache.getNumPagesOfFile(fileId);
if (pages == 0) {
- return 0;
+ //At least there are 2 pages to consider the index is not empty
+ return IBufferCache.INVALID_PAGEID;
}
metadataPage = pages - 1;
return metadataPage;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index b77f14f..f83a27d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public abstract class AbstractTreeIndex implements ITreeIndex {
@@ -241,8 +242,9 @@ public abstract class AbstractTreeIndex implements ITreeIndex {
// HDFS. Since loading this tree relies on the root page actually being at that point, no further inserts into
// that tree are allowed. Currently, this is not enforced.
protected boolean releasedLatches;
- protected final IFIFOPageQueue queue;
+ private final IFIFOPageQueue queue;
protected List<ICachedPage> pagesToWrite;
+ private final ICompressedPageWriter compressedPageWriter;
public AbstractTreeIndexBulkLoader(float fillFactor) throws HyracksDataException {
leafFrame = leafFrameFactory.createFrame();
@@ -278,10 +280,12 @@ public abstract class AbstractTreeIndex implements ITreeIndex {
nodeFrontiers.add(leafFrontier);
pagesToWrite = new ArrayList<>();
+ compressedPageWriter = bufferCache.getCompressedPageWriter(fileId);
}
- protected void handleException() throws HyracksDataException {
+ protected void handleException() {
// Unlatch and unpin pages that weren't in the queue to avoid leaking memory.
+ compressedPageWriter.abort();
for (NodeFrontier nodeFrontier : nodeFrontiers) {
ICachedPage frontierPage = nodeFrontier.page;
if (frontierPage.confiscated()) {
@@ -296,10 +300,10 @@ public abstract class AbstractTreeIndex implements ITreeIndex {
@Override
public void end() throws HyracksDataException {
- bufferCache.finishQueue();
if (hasFailed()) {
throw HyracksDataException.create(getFailure());
}
+ bufferCache.finishQueue();
freePageManager.setRootPageId(rootPage);
}
@@ -320,6 +324,12 @@ public abstract class AbstractTreeIndex implements ITreeIndex {
public void setLeafFrame(ITreeIndexFrame leafFrame) {
this.leafFrame = leafFrame;
}
+
+ public void putInQueue(ICachedPage cPage) throws HyracksDataException {
+ compressedPageWriter.prepareWrite(cPage);
+ queue.put(cPage, this);
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
index 76f7e61..89ccfed 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -55,12 +56,13 @@ public class ExternalBTreeLocalResource extends LSMBTreeLocalResource {
super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, path,
storageManager, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories,
btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null,
- ioSchedulerProvider, durable);
+ ioSchedulerProvider, durable, NoOpCompressorDecompressorFactory.INSTANCE);
}
private ExternalBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) throws HyracksDataException {
- super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields);
+ super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
+ NoOpCompressorDecompressorFactory.INSTANCE);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
index a4c24c9..555f641 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
public class ExternalBTreeLocalResourceFactory extends LSMBTreeLocalResourceFactory {
@@ -46,7 +47,7 @@ public class ExternalBTreeLocalResourceFactory extends LSMBTreeLocalResourceFact
super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, null, ioSchedulerProvider,
mergePolicyFactory, mergePolicyProperties, durable, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
- isPrimary, btreeFields);
+ isPrimary, btreeFields, NoOpCompressorDecompressorFactory.INSTANCE);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
index 2a57e74..ddf0955 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -58,13 +59,14 @@ public class ExternalBTreeWithBuddyLocalResource extends LSMBTreeLocalResource {
super(typeTraits, cmpFactories, buddyBtreeFields, bloomFilterFalsePositiveRate, isPrimary, path, storageManager,
mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields,
filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null,
- ioSchedulerProvider, durable);
+ ioSchedulerProvider, durable, NoOpCompressorDecompressorFactory.INSTANCE);
}
private ExternalBTreeWithBuddyLocalResource(IPersistedResourceRegistry registry, JsonNode json,
int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields)
throws HyracksDataException {
- super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields);
+ super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
+ NoOpCompressorDecompressorFactory.INSTANCE);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
index 2aff61a..89e5154 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
public class ExternalBTreeWithBuddyLocalResourceFactory extends LSMBTreeLocalResourceFactory {
@@ -46,7 +47,7 @@ public class ExternalBTreeWithBuddyLocalResourceFactory extends LSMBTreeLocalRes
super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null, ioSchedulerProvider,
mergePolicyFactory, mergePolicyProperties, durable, buddyBtreeFields, bloomFilterFalsePositiveRate,
- isPrimary, btreeFields);
+ isPrimary, btreeFields, NoOpCompressorDecompressorFactory.INSTANCE);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index 40278d0..7d5beff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -40,6 +41,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -52,6 +54,7 @@ public class LSMBTreeLocalResource extends LsmResource {
protected final double bloomFilterFalsePositiveRate;
protected final boolean isPrimary;
protected final int[] btreeFields;
+ protected final ICompressorDecompressorFactory compressorDecompressorFactory;
public LSMBTreeLocalResource(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, boolean isPrimary, String path,
@@ -60,7 +63,8 @@ public class LSMBTreeLocalResource extends LsmResource {
IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields,
ILSMOperationTrackerFactory opTrackerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
- ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable) {
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable,
+ ICompressorDecompressorFactory compressorDecompressorFactory) {
super(path, storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
mergePolicyFactory, mergePolicyProperties, durable);
@@ -68,15 +72,18 @@ public class LSMBTreeLocalResource extends LsmResource {
this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
this.isPrimary = isPrimary;
this.btreeFields = btreeFields;
+ this.compressorDecompressorFactory = compressorDecompressorFactory;
}
protected LSMBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
- double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) throws HyracksDataException {
+ double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
+ ICompressorDecompressorFactory compressorDecompressorFactory) throws HyracksDataException {
super(registry, json);
this.bloomFilterKeyFields = bloomFilterKeyFields;
this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
this.isPrimary = isPrimary;
this.btreeFields = btreeFields;
+ this.compressorDecompressorFactory = compressorDecompressorFactory;
}
@Override
@@ -92,7 +99,8 @@ public class LSMBTreeLocalResource extends LsmResource {
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, isPrimary, filterTypeTraits, filterCmpFactories, btreeFields, filterFields,
- durable, metadataPageManagerFactory, updateAware, serviceCtx.getTracer());
+ durable, metadataPageManagerFactory, updateAware, serviceCtx.getTracer(),
+ compressorDecompressorFactory);
}
@Override
@@ -108,8 +116,11 @@ public class LSMBTreeLocalResource extends LsmResource {
final double bloomFilterFalsePositiveRate = json.get("bloomFilterFalsePositiveRate").asDouble();
final boolean isPrimary = json.get("isPrimary").asBoolean();
final int[] btreeFields = OBJECT_MAPPER.convertValue(json.get("btreeFields"), int[].class);
+ final JsonNode compressorDecompressorNode = json.get("compressorDecompressorFactory");
+ final ICompressorDecompressorFactory compDecompFactory = (ICompressorDecompressorFactory) registry
+ .deserializeOrDefault(compressorDecompressorNode, NoOpCompressorDecompressorFactory.class);
return new LSMBTreeLocalResource(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary,
- btreeFields);
+ btreeFields, compDecompFactory);
}
@Override
@@ -120,5 +131,6 @@ public class LSMBTreeLocalResource extends LsmResource {
json.put("bloomFilterFalsePositiveRate", bloomFilterFalsePositiveRate);
json.put("isPrimary", isPrimary);
json.putPOJO("btreeFields", btreeFields);
+ json.putPOJO("compressorDecompressorFactory", compressorDecompressorFactory.toJson(registry));
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
index 5fae5b9..ea41c3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.storage.am.lsm.btree.dataflow;
import java.util.Map;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.io.FileReference;
@@ -40,6 +41,7 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory {
protected final double bloomFilterFalsePositiveRate;
protected final boolean isPrimary;
protected final int[] btreeFields;
+ protected final ICompressorDecompressorFactory compressorDecompressorFactory;
public LSMBTreeLocalResourceFactory(IStorageManager storageManager, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] cmpFactories, ITypeTraits[] filterTypeTraits,
@@ -48,7 +50,8 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory {
IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMMergePolicyFactory mergePolicyFactory,
Map<String, String> mergePolicyProperties, boolean durable, int[] bloomFilterKeyFields,
- double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) {
+ double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
+ ICompressorDecompressorFactory compressorDecompressorFactory) {
super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
mergePolicyFactory, mergePolicyProperties, durable);
@@ -56,6 +59,7 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory {
this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
this.isPrimary = isPrimary;
this.btreeFields = btreeFields;
+ this.compressorDecompressorFactory = compressorDecompressorFactory;
}
@Override
@@ -63,6 +67,6 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory {
return new LSMBTreeLocalResource(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
isPrimary, fileRef.getRelativePath(), storageManager, mergePolicyFactory, mergePolicyProperties,
filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory,
- metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable);
+ metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable, compressorDecompressorFactory);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index 0fc79eb..9169cbf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -21,12 +21,14 @@ package org.apache.hyracks.storage.am.lsm.btree.impls;
import java.util.HashSet;
import java.util.Set;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.DiskBTree;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent {
protected final DiskBTree btree;
@@ -80,7 +82,12 @@ public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent {
static Set<String> getFiles(BTree btree) {
Set<String> files = new HashSet<>();
- files.add(btree.getFileReference().getFile().getAbsolutePath());
+ final FileReference fileRef = btree.getFileReference();
+ files.add(fileRef.getAbsolutePath());
+ if (fileRef.isCompressed()) {
+ final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
+ files.add(cFileRef.getLAFAbsolutePath());
+ }
return files;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 2240fd9..ca5f968 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -37,6 +38,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManage
import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
public class LSMBTreeFileManager extends AbstractLSMIndexFileManager {
@@ -46,24 +48,30 @@ public class LSMBTreeFileManager extends AbstractLSMIndexFileManager {
private final boolean hasBloomFilter;
public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
- TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter) {
- super(ioManager, file, null);
+ TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter,
+ ICompressorDecompressorFactory compressorDecompressorFactory) {
+ super(ioManager, file, null, compressorDecompressorFactory);
this.btreeFactory = btreeFactory;
this.hasBloomFilter = hasBloomFilter;
}
+ public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
+ TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter) {
+ this(ioManager, file, btreeFactory, hasBloomFilter, NoOpCompressorDecompressorFactory.INSTANCE);
+ }
+
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
String baseName = getNextComponentSequence(btreeFilter);
- return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
- hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
+ return new LSMComponentFileReferences(getFileReference(baseName + DELIMITER + BTREE_SUFFIX), null,
+ hasBloomFilter ? getFileReference(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
}
@Override
public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
- return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
- hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
+ return new LSMComponentFileReferences(getFileReference(baseName + DELIMITER + BTREE_SUFFIX), null,
+ hasBloomFilter ? getFileReference(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index d071bac..70ac3f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.btree.utils;
import java.util.List;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -72,8 +73,8 @@ public class LSMBTreeUtil {
ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean needKeyDupCheck, ITypeTraits[] filterTypeTraits,
IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields, boolean durable,
- IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer)
- throws HyracksDataException {
+ IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer,
+ ICompressorDecompressorFactory compressorDecompressorFactory) throws HyracksDataException {
LSMBTreeTupleWriterFactory insertTupleWriterFactory =
new LSMBTreeTupleWriterFactory(typeTraits, cmpFactories.length, false, updateAware);
LSMBTreeTupleWriterFactory deleteTupleWriterFactory =
@@ -106,10 +107,10 @@ public class LSMBTreeUtil {
filterManager = new LSMComponentFilterManager(filterFrameFactory);
}
- //Primary LSMBTree index has a BloomFilter.
- ILSMIndexFileManager fileNameManager =
- new LSMBTreeFileManager(ioManager, file, diskBTreeFactory, needKeyDupCheck);
+ ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, file, diskBTreeFactory,
+ needKeyDupCheck, compressorDecompressorFactory);
+ //Primary LSMBTree index has a BloomFilter.
ILSMDiskComponentFactory componentFactory;
ILSMDiskComponentFactory bulkLoadComponentFactory;
if (needKeyDupCheck) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 904029b..7618264 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -27,6 +27,8 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -38,6 +40,9 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressor;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.util.annotations.NotThreadSafe;
@@ -71,6 +76,10 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage
*/
public static final String DELETE_TREE_SUFFIX = "d";
/**
+ * Indicates Look Aside File (LAF) for compressed indexes
+ */
+ public static final String LAF_SUFFIX = ".dic";
+ /**
* Hides transaction components until they are either committed by removing this file or deleted along with the file
*/
public static final String TXN_PREFIX = ".T";
@@ -88,12 +97,20 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage
protected final Comparator<IndexComponentFileReference> recencyCmp = new RecencyComparator();
protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
private long lastUsedComponentSeq = UNINITALIZED_COMPONENT_SEQ;
+ private final ICompressorDecompressorFactory compressorDecompressorFactory;
public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file,
TreeIndexFactory<? extends ITreeIndex> treeFactory) {
+ this(ioManager, file, treeFactory, NoOpCompressorDecompressorFactory.INSTANCE);
+ }
+
+ public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file,
+ TreeIndexFactory<? extends ITreeIndex> treeFactory,
+ ICompressorDecompressorFactory compressorDecompressorFactory) {
this.ioManager = ioManager;
this.baseDir = file;
this.treeFactory = treeFactory;
+ this.compressorDecompressorFactory = compressorDecompressorFactory;
}
protected TreeIndexState isValidTreeIndex(ITreeIndex treeIndex) throws HyracksDataException {
@@ -131,7 +148,7 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage
IBufferCache bufferCache) throws HyracksDataException {
String[] files = listDirFiles(baseDir, filter);
for (String fileName : files) {
- FileReference fileRef = baseDir.getChild(fileName);
+ FileReference fileRef = getFileReference(fileName);
if (treeFactory == null) {
allFiles.add(IndexComponentFileReference.of(fileRef));
continue;
@@ -362,6 +379,21 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage
return IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq);
}
+ protected FileReference getFileReference(String name) {
+ final ICompressorDecompressor compDecomp = compressorDecompressorFactory.createInstance();
+ //Avoid creating LAF file for NoOpCompressorDecompressor
+ if (compDecomp != NoOpCompressorDecompressor.INSTANCE && isCompressible(name)) {
+ final String path = baseDir.getChildPath(name);
+ return new CompressedFileReference(baseDir.getDeviceHandle(), compDecomp, path, path + LAF_SUFFIX);
+ }
+
+ return baseDir.getChild(name);
+ }
+
+ private boolean isCompressible(String fileName) {
+ return !fileName.endsWith(BLOOM_FILTER_SUFFIX) && !fileName.endsWith(DELETE_TREE_SUFFIX);
+ }
+
private long getOnDiskLastUsedComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
long maxComponentSeq = -1;
final String[] files = listDirFiles(baseDir, filenameFilter);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index 635fe7a..b34a13c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -941,9 +941,10 @@ public class RTree extends AbstractTreeIndex {
propagateBulk(1, false, pagesToWrite);
leafFrontier.pageId = freePageManager.takePage(metaFrame);
- queue.put(leafFrontier.page, this);
+
+ putInQueue(leafFrontier.page);
for (ICachedPage c : pagesToWrite) {
- queue.put(c, this);
+ putInQueue(c);
}
pagesToWrite.clear();
leafFrontier.page = bufferCache
@@ -974,7 +975,7 @@ public class RTree extends AbstractTreeIndex {
}
for (ICachedPage c : pagesToWrite) {
- queue.put(c, this);
+ putInQueue(c);
}
finish();
super.end();
@@ -1011,7 +1012,7 @@ public class RTree extends AbstractTreeIndex {
((RTreeNSMFrame) lowerFrame).adjustMBR();
interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0);
}
- queue.put(n.page, this);
+ putInQueue(n.page);
n.page = null;
prevPageId = n.pageId;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index 9c50f2d..423925b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -66,5 +66,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.7.1</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
new file mode 100644
index 0000000..47e7534
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+/**
+ * Handles all IO operations for a specified file.
+ */
+@NotThreadSafe
+public abstract class AbstractBufferedFileIOManager {
+ private static final String ERROR_MESSAGE = "%s unexpected number of bytes: [expected: %d, actual: %d, file: %s]";
+ private static final String READ = "Read";
+ private static final String WRITE = "Written";
+
+ protected final BufferCache bufferCache;
+ protected final IPageReplacementStrategy pageReplacementStrategy;
+ private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache;
+ private final IIOManager ioManager;
+
+ private IFileHandle fileHandle;
+ private volatile boolean hasOpen;
+
+ protected AbstractBufferedFileIOManager(BufferCache bufferCache, IIOManager ioManager,
+ BlockingQueue<BufferCacheHeaderHelper> headerPageCache, IPageReplacementStrategy pageReplacementStrategy) {
+ this.bufferCache = bufferCache;
+ this.ioManager = ioManager;
+ this.headerPageCache = headerPageCache;
+ this.pageReplacementStrategy = pageReplacementStrategy;
+ hasOpen = false;
+ }
+
+ /* ********************************
+ * Read/Write page methods
+ * ********************************
+ */
+
+ /**
+ * Read the CachedPage from disk
+ *
+ * @param cPage
+ * CachedPage in {@link BufferCache}
+ * @throws HyracksDataException
+ */
+ public abstract void read(CachedPage cPage) throws HyracksDataException;
+
+ /**
+ * Write the CachedPage into disk
+ *
+ * @param cPage
+ * CachedPage in {@link BufferCache}
+ * @throws HyracksDataException
+ */
+ public void write(CachedPage cPage) throws HyracksDataException {
+ final int totalPages = cPage.getFrameSizeMultiplier();
+ final int extraBlockPageId = cPage.getExtraBlockPageId();
+ final BufferCacheHeaderHelper header = checkoutHeaderHelper();
+ write(cPage, header, totalPages, extraBlockPageId);
+ }
+
+ /**
+ * Write the CachedPage into disk called by {@link AbstractBufferedFileIOManager#write(CachedPage)}
+ * Note: It is the responsibility of the caller to return {@link BufferCacheHeaderHelper}
+ *
+ * @param cPage
+ * CachedPage that will be written
+ * @param header
+ * HeaderHelper to add into the written page
+ * @param totalPages
+ * Number of pages to be written
+ * @param extraBlockPageId
+ * Extra page ID in case it has more than one page
+ * @throws HyracksDataException
+ */
+ protected abstract void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages,
+ int extraBlockPageId) throws HyracksDataException;
+
+ /* ********************************
+ * File operations' methods
+ * ********************************
+ */
+
+ /**
+ * Open the file
+ *
+ * @throws HyracksDataException
+ */
+ public void open(FileReference fileRef) throws HyracksDataException {
+ fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ hasOpen = true;
+ }
+
+ /**
+ * Close the file
+ *
+ * @throws HyracksDataException
+ */
+ public void close() throws HyracksDataException {
+ if (hasOpen) {
+ ioManager.close(fileHandle);
+ }
+ }
+
+ public void purge() throws HyracksDataException {
+ ioManager.close(fileHandle);
+ }
+
+ /**
+ * Force the file into disk
+ *
+ * @param metadata
+ * see {@link java.nio.channels.FileChannel#force(boolean)}
+ * @throws HyracksDataException
+ */
+ public void force(boolean metadata) throws HyracksDataException {
+ ioManager.sync(fileHandle, metadata);
+ }
+
+ /**
+ * Get the number of pages in the file
+ *
+ * @throws HyracksDataException
+ */
+ public abstract int getNumberOfPages() throws HyracksDataException;
+
+ public void markAsDeleted() throws HyracksDataException {
+ fileHandle = null;
+ }
+
+ /**
+ * Check whether the file has been deleted
+ *
+ * @return
+ * true if has been deleted, false o.w
+ */
+ public boolean hasBeenDeleted() {
+ return fileHandle == null;
+ }
+
+ /**
+ * Check whether the file has ever been opened
+ *
+ * @return
+ * true if has ever been open, false o.w
+ */
+ public final boolean hasBeenOpened() {
+ return hasOpen;
+ }
+
+ public final FileReference getFileReference() {
+ return fileHandle.getFileReference();
+ }
+
+ public static void createFile(BufferCache bufferCache, FileReference fileRef) throws HyracksDataException {
+ IoUtil.create(fileRef);
+ if (fileRef.isCompressed()) {
+ final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
+ try {
+ bufferCache.createFile(cFileRef.getLAFFileReference());
+ } catch (HyracksDataException e) {
+ //In case of creating the LAF file failed, delete fileRef
+ IoUtil.delete(fileRef);
+ throw e;
+ }
+ }
+ }
+
+ public static void deleteFile(FileReference fileRef) throws HyracksDataException {
+ IoUtil.delete(fileRef);
+ if (fileRef.isCompressed()) {
+ final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
+ if (cFileRef.getFile().exists()) {
+ IoUtil.delete(cFileRef.getLAFFileReference());
+ }
+ }
+ }
+
+ /* ********************************
+ * Compressed file methods
+ * ********************************
+ */
+
+ public abstract ICompressedPageWriter getCompressedPageWriter();
+
+ /* ********************************
+ * Common helper methods
+ * ********************************
+ */
+
+ /**
+ * Get the offset for the first page
+ *
+ * @param cPage
+ * CachedPage for which the offset is needed
+ * @return
+ * page offset in the file
+ */
+ protected abstract long getFirstPageOffset(CachedPage cPage);
+
+ /**
+ * Get the offset for the extra page
+ *
+ * @param cPage
+ * CachedPage for which the offset is needed
+ * @return
+ * page offset in the file
+ */
+ protected abstract long getExtraPageOffset(CachedPage cPage);
+
+ protected final BufferCacheHeaderHelper checkoutHeaderHelper() {
+ BufferCacheHeaderHelper helper = headerPageCache.poll();
+ if (helper == null) {
+ helper = new BufferCacheHeaderHelper(bufferCache.getPageSize());
+ }
+ return helper;
+ }
+
+ protected final void returnHeaderHelper(BufferCacheHeaderHelper buffer) {
+ headerPageCache.offer(buffer); //NOSONAR
+ }
+
+ protected final long readToBuffer(ByteBuffer buf, long offset) throws HyracksDataException {
+ return ioManager.syncRead(fileHandle, offset, buf);
+ }
+
+ protected final long writeToFile(ByteBuffer buf, long offset) throws HyracksDataException {
+ return ioManager.syncWrite(fileHandle, offset, buf);
+ }
+
+ protected final long writeToFile(ByteBuffer[] buf, long offset) throws HyracksDataException {
+ return ioManager.syncWrite(fileHandle, offset, buf);
+ }
+
+ protected final long getFileSize() {
+ return ioManager.getSize(fileHandle);
+ }
+
+ protected final void verifyBytesWritten(long expected, long actual) {
+ if (expected != actual) {
+ throwException(WRITE, expected, actual);
+ }
+ }
+
+ protected final boolean verifyBytesRead(long expected, long actual) {
+ if (expected != actual) {
+ if (actual == -1) {
+ // disk order scan code seems to rely on this behavior, so silently return
+ return false;
+ } else {
+ throwException(READ, expected, actual);
+ }
+ }
+ return true;
+ }
+
+ protected void throwException(String op, long expected, long actual) {
+ final String path = fileHandle.getFileReference().getAbsolutePath();
+ throw new IllegalStateException(String.format(ERROR_MESSAGE, op, expected, actual, path));
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 1e3f85b..7441395 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -22,7 +22,6 @@ import static org.apache.hyracks.control.nc.io.IOManager.IO_REQUEST_QUEUE_SIZE;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -42,12 +41,12 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.replication.IIOReplicationManager;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.storage.common.file.IFileMapManager;
import org.apache.logging.log4j.Level;
@@ -80,18 +79,18 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache =
new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
+ private IIOReplicationManager ioReplicationManager;
+ private final List<ICachedPageInternal> cachedPages = new ArrayList<>();
+ private final AtomicLong masterPinCount = new AtomicLong();
+
+ private boolean closed;
+
//DEBUG
private static final Level fileOpsLevel = Level.TRACE;
private ArrayList<CachedPage> confiscatedPages;
private Lock confiscateLock;
private HashMap<CachedPage, StackTraceElement[]> confiscatedPagesOwner;
private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner;
- //!DEBUG
- private IIOReplicationManager ioReplicationManager;
- private final List<ICachedPageInternal> cachedPages = new ArrayList<>();
- private final AtomicLong masterPinCount = new AtomicLong();
-
- private boolean closed;
public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy,
IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles,
@@ -158,7 +157,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
synchronized (fileInfoMap) {
fInfo = fileInfoMap.get(fileId);
}
- if (fInfo == null) {
+ if (fInfo == null || fInfo.hasBeenDeleted() || !fInfo.hasBeenOpened()) {
throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been created.");
} else if (fInfo.getReferenceCount() <= 0) {
throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been opened.");
@@ -546,35 +545,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
}
private void read(CachedPage cPage) throws HyracksDataException {
- BufferedFileHandle fInfo = getFileInfo(cPage);
+ BufferedFileHandle fInfo = getFileHandle(cPage);
cPage.buffer.clear();
- BufferCacheHeaderHelper header = checkoutHeaderHelper();
- try {
- long bytesRead = ioManager.syncRead(fInfo.getFileHandle(),
- getOffsetForPage(BufferedFileHandle.getPageId(cPage.dpid)), header.prepareRead());
-
- if (bytesRead != getPageSizeWithHeader()) {
- if (bytesRead == -1) {
- // disk order scan code seems to rely on this behavior, so silently return
- return;
- }
- throw new HyracksDataException("Failed to read a complete page: " + bytesRead);
- }
- int totalPages = header.processRead(cPage);
-
- if (totalPages > 1) {
- pageReplacementStrategy.fixupCapacityOnLargeRead(cPage);
- cPage.buffer.position(pageSize);
- cPage.buffer.limit(totalPages * pageSize);
- ioManager.syncRead(fInfo.getFileHandle(), getOffsetForPage(cPage.getExtraBlockPageId()), cPage.buffer);
- }
- } finally {
- returnHeaderHelper(header);
- }
- }
-
- private long getOffsetForPage(long pageId) {
- return pageId * getPageSizeWithHeader();
+ fInfo.read(cPage);
}
@Override
@@ -583,67 +556,16 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
pageReplacementStrategy.resizePage((ICachedPageInternal) cPage, totalPages, extraPageBlockHelper);
}
- BufferedFileHandle getFileInfo(CachedPage cPage) throws HyracksDataException {
- return getFileInfo(BufferedFileHandle.getFileId(cPage.dpid));
- }
-
- BufferedFileHandle getFileInfo(int fileId) throws HyracksDataException {
- BufferedFileHandle fInfo;
- synchronized (fileInfoMap) {
- fInfo = fileInfoMap.get(fileId);
- }
- if (fInfo == null) {
- throw HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, fileId);
- }
- return fInfo;
- }
-
- private BufferCacheHeaderHelper checkoutHeaderHelper() {
- BufferCacheHeaderHelper helper = headerPageCache.poll();
- if (helper == null) {
- helper = new BufferCacheHeaderHelper(pageSize);
- }
- return helper;
- }
-
- private void returnHeaderHelper(BufferCacheHeaderHelper buffer) {
- headerPageCache.offer(buffer);
- }
-
void write(CachedPage cPage) throws HyracksDataException {
- BufferedFileHandle fInfo = getFileInfo(cPage);
+ BufferedFileHandle fInfo = getFileHandle(cPage);
// synchronize on fInfo to prevent the file handle from being deleted until the page is written.
synchronized (fInfo) {
- if (fInfo.fileHasBeenDeleted()) {
+ if (fInfo.hasBeenDeleted()) {
return;
}
- ByteBuffer buf = cPage.buffer.duplicate();
- final int totalPages = cPage.getFrameSizeMultiplier();
- final int extraBlockPageId = cPage.getExtraBlockPageId();
- final boolean contiguousLargePages = (BufferedFileHandle.getPageId(cPage.dpid) + 1) == extraBlockPageId;
- BufferCacheHeaderHelper header = checkoutHeaderHelper();
- try {
- buf.limit(contiguousLargePages ? pageSize * totalPages : pageSize);
- buf.position(0);
- long bytesWritten = ioManager.syncWrite(fInfo.getFileHandle(),
- getOffsetForPage(BufferedFileHandle.getPageId(cPage.dpid)), header.prepareWrite(cPage, buf));
-
- if (bytesWritten != (contiguousLargePages ? pageSize * (totalPages - 1) : 0)
- + getPageSizeWithHeader()) {
- throw new HyracksDataException("Failed to write completely: " + bytesWritten);
- }
- } finally {
- returnHeaderHelper(header);
- }
- if (totalPages > 1 && !contiguousLargePages) {
- buf.limit(totalPages * pageSize);
- ioManager.syncWrite(fInfo.getFileHandle(), getOffsetForPage(extraBlockPageId), buf);
- }
- if (buf.capacity() != pageSize * totalPages) {
- throw new IllegalStateException("Illegal number of bytes written, expected bytes written: "
- + pageSize * totalPages + " actual bytes writte: " + buf.capacity());
- }
+ fInfo.write(cPage);
}
+
}
@Override
@@ -794,8 +716,8 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
synchronized (fileInfoMap) {
fileInfoMap.forEach((key, value) -> {
try {
- sweepAndFlush(key, true);
- ioManager.close(value.getFileHandle());
+ sweepAndFlush(value, true);
+ value.close();
} catch (HyracksDataException e) {
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN, "Error flushing file id: " + key, e);
@@ -811,11 +733,14 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
if (LOGGER.isEnabled(fileOpsLevel)) {
LOGGER.log(fileOpsLevel, "Creating file: " + fileRef + " in cache: " + this);
}
- IoUtil.create(fileRef);
+ BufferedFileHandle.createFile(this, fileRef);
+ int fileId;
try {
synchronized (fileInfoMap) {
- return fileMapManager.registerFile(fileRef);
+ fileId = fileMapManager.registerFile(fileRef);
+ getOrCreateFileHandle(fileId);
}
+ return fileId;
} catch (Exception e) {
// If file registration failed for any reason, we need to undo the file creation
try {
@@ -851,11 +776,14 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
}
try {
final BufferedFileHandle fInfo = getOrCreateFileHandle(fileId);
- if (fInfo.getFileHandle() == null) {
+ //CompressedFileReference may open another file which may sweep and close out this fInfo
+ fInfo.incReferenceCount();
+
+ if (!fInfo.hasBeenOpened()) {
// a new file
synchronized (fInfo) {
// prevent concurrent opening of the same file
- if (fInfo.getFileHandle() == null) {
+ if (!fInfo.hasBeenOpened()) {
if (fileInfoMap.size() > maxOpenFiles) {
closeOpeningFiles(fInfo);
}
@@ -864,15 +792,12 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
synchronized (fileInfoMap) {
fileRef = fileMapManager.lookupFileName(fileId);
}
- IFileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- fInfo.setFileHandle(fh);
+ fInfo.open(fileRef);
}
}
}
- fInfo.incReferenceCount();
} catch (Exception e) {
- removeFileInfo(fileId);
+ removeFileHandle(fileId);
throw HyracksDataException.create(e);
}
}
@@ -888,11 +813,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
if (fh != newFileHandle && fh.getReferenceCount() <= 0) {
if (fh.getReferenceCount() < 0) {
throw new IllegalStateException("Illegal reference count " + fh.getReferenceCount()
- + " of file " + fh.getFileHandle().getFileReference());
+ + " of file " + fh.getFileReference());
}
int entryFileId = entry.getKey();
- sweepAndFlush(entryFileId, true);
- ioManager.close(entry.getValue().getFileHandle());
+ sweepAndFlush(fh, true);
+ entry.getValue().close();
fileInfoMap.remove(entryFileId);
unreferencedFileFound = true;
// for-each iterator is invalid because we changed
@@ -908,7 +833,12 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
}
}
- private void sweepAndFlush(int fileId, boolean flushDirtyPages) throws HyracksDataException {
+ private void sweepAndFlush(BufferedFileHandle fInfo, boolean flushDirtyPages) throws HyracksDataException {
+ if (!fInfo.hasBeenOpened()) {
+ //Skip flushing as the file has not been open
+ return;
+ }
+ final int fileId = fInfo.getFileId();
for (final CacheBucket bucket : pageMap) {
bucket.bucketLock.lock();
try {
@@ -973,7 +903,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
synchronized (fileInfoMap) {
BufferedFileHandle fInfo = fileInfoMap.get(fileId);
- if (fInfo == null) {
+ if (fInfo == null || !fInfo.hasBeenOpened()) {
throw new HyracksDataException("Closing unopened file");
}
if (fInfo.decReferenceCount() < 0) {
@@ -997,7 +927,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
synchronized (fileInfoMap) {
fInfo = fileInfoMap.get(fileId);
}
- ioManager.sync(fInfo.getFileHandle(), metadata);
+ fInfo.force(metadata);
}
@Override
@@ -1013,7 +943,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
if (mapped) {
deleteFile(fileId);
} else {
- IoUtil.delete(fileRef);
+ BufferedFileHandle.deleteFile(fileRef);
}
}
@@ -1022,11 +952,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
if (LOGGER.isEnabled(fileOpsLevel)) {
LOGGER.log(fileOpsLevel, "Deleting file: " + fileId + " in cache: " + this);
}
- BufferedFileHandle fInfo = removeFileInfo(fileId);
+ BufferedFileHandle fInfo = removeFileHandle(fileId);
if (fInfo == null) {
return;
}
- sweepAndFlush(fileId, false);
+ sweepAndFlush(fInfo, false);
try {
if (fInfo.getReferenceCount() > 0) {
throw new HyracksDataException("Deleting open file");
@@ -1040,11 +970,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
} finally {
try {
synchronized (fInfo) {
- ioManager.close(fInfo.getFileHandle());
+ fInfo.close();
fInfo.markAsDeleted();
}
} finally {
- IoUtil.delete(fileRef);
+ BufferedFileHandle.deleteFile(fileRef);
}
}
}
@@ -1178,10 +1108,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
if (fInfo == null) {
throw new HyracksDataException("No such file mapped for fileId:" + fileId);
}
- if (DEBUG) {
- assert ioManager.getSize(fInfo.getFileHandle()) % getPageSizeWithHeader() == 0;
- }
- return (int) (ioManager.getSize(fInfo.getFileHandle()) / getPageSizeWithHeader());
+ return fInfo.getNumberOfPages();
}
}
@@ -1290,18 +1217,35 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
return null;
}
- private BufferedFileHandle getOrCreateFileHandle(int fileId) {
+ private BufferedFileHandle getOrCreateFileHandle(int fileId) throws HyracksDataException {
synchronized (fileInfoMap) {
- return fileInfoMap.computeIfAbsent(fileId, id -> new BufferedFileHandle(fileId, null));
+ final FileReference fileRef = fileMapManager.lookupFileName(fileId);
+ return fileInfoMap.computeIfAbsent(fileId, id -> BufferedFileHandle.create(fileRef, fileId, this, ioManager,
+ headerPageCache, pageReplacementStrategy));
}
}
- private BufferedFileHandle removeFileInfo(int fileId) {
+ private BufferedFileHandle removeFileHandle(int fileId) {
synchronized (fileInfoMap) {
return fileInfoMap.remove(fileId);
}
}
+ private BufferedFileHandle getFileHandle(CachedPage cPage) throws HyracksDataException {
+ return getFileHandle(BufferedFileHandle.getFileId(cPage.dpid));
+ }
+
+ private BufferedFileHandle getFileHandle(int fileId) throws HyracksDataException {
+ BufferedFileHandle fInfo;
+ synchronized (fileInfoMap) {
+ fInfo = fileInfoMap.get(fileId);
+ }
+ if (fInfo == null) {
+ throw HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, fileId);
+ }
+ return fInfo;
+ }
+
private ICachedPage getPageLoop(long dpid, int multiplier, boolean confiscate) throws HyracksDataException {
final long startingPinCount = DEBUG ? masterPinCount.get() : -1;
int cycleCount = 0;
@@ -1442,54 +1386,16 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
@Override
public void purgeHandle(int fileId) throws HyracksDataException {
- BufferedFileHandle fh = removeFileInfo(fileId);
+ BufferedFileHandle fh = removeFileHandle(fileId);
if (fh != null) {
synchronized (fileInfoMap) {
fileMapManager.unregisterFile(fileId);
+ fh.purge();
}
- ioManager.close(fh.getFileHandle());
}
}
- static class BufferCacheHeaderHelper {
- private static final int FRAME_MULTIPLIER_OFF = 0;
- private static final int EXTRA_BLOCK_PAGE_ID_OFF = FRAME_MULTIPLIER_OFF + 4; // 4
-
- private final ByteBuffer buf;
- private final ByteBuffer[] array;
-
- private BufferCacheHeaderHelper(int pageSize) {
- buf = ByteBuffer.allocate(RESERVED_HEADER_BYTES + pageSize);
- array = new ByteBuffer[] { buf, null };
- }
-
- private ByteBuffer[] prepareWrite(CachedPage cPage, ByteBuffer pageBuffer) {
- buf.position(0);
- buf.limit(RESERVED_HEADER_BYTES);
- buf.putInt(FRAME_MULTIPLIER_OFF, cPage.getFrameSizeMultiplier());
- buf.putInt(EXTRA_BLOCK_PAGE_ID_OFF, cPage.getExtraBlockPageId());
- array[1] = pageBuffer;
- return array;
- }
-
- private ByteBuffer prepareRead() {
- buf.position(0);
- buf.limit(buf.capacity());
- return buf;
- }
-
- private int processRead(CachedPage cPage) {
- buf.position(RESERVED_HEADER_BYTES);
- cPage.buffer.position(0);
- cPage.buffer.put(buf);
- int multiplier = buf.getInt(FRAME_MULTIPLIER_OFF);
- cPage.setFrameSizeMultiplier(multiplier);
- cPage.setExtraBlockPageId(buf.getInt(EXTRA_BLOCK_PAGE_ID_OFF));
- return multiplier;
- }
- }
-
@Override
public void closeFileIfOpen(FileReference fileRef) {
synchronized (fileInfoMap) {
@@ -1508,4 +1414,14 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
}
}
+ @Override
+ public ICompressedPageWriter getCompressedPageWriter(int fileId) {
+ final BufferedFileHandle fInfo;
+ synchronized (fileInfoMap) {
+ fInfo = fileInfoMap.get(fileId);
+ }
+
+ return fInfo.getCompressedPageWriter();
+ }
+
}
\ No newline at end of file