You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/11/07 07:15:48 UTC

[03/10] asterixdb git commit: [ASTERIXDB-2422][STO] Introduce compressed storage

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
new file mode 100644
index 0000000..3a7e901
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.compression;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An API for block compressor/decompressor.
+ *
+ * Note: Should never allocate any buffer in compress/uncompress operations and it must be stateless to be thread safe.
+ */
+public interface ICompressorDecompressor {
+    /**
+     * Computes the required buffer size for <i>compress()</i>.
+     *
+     * @param uBufferSize
+     *            The size of the uncompressed buffer.
+     * @return The required buffer size for compression
+     */
+    int computeCompressedBufferSize(int uBufferSize);
+
+    /**
+     * Compress <i>uBuffer</i> into <i>cBuffer</i>
+     *
+     * @param uBuffer
+     *            Uncompressed source buffer
+     * @param cBuffer
+     *            Compressed destination buffer
+     * @return Buffer after compression. ({@link ByteBuffer#limit()} is set to the compressed size
+     * @throws HyracksDataException
+     */
+    ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException;
+
+    /**
+     * Uncompress <i>cBuffer</i> into <i>uBuffer</i>
+     *
+     * @param cBuffer
+     *            Compressed source buffer
+     * @param uBuffer
+     *            Uncompressed destination buffer
+     * @return Buffer after decompression. ({@link ByteBuffer#limit()} is set to the uncompressed size
+     * @throws HyracksDataException
+     *             An exception will be thrown if the <i>uBuffer</i> size is not sufficient.
+     */
+    ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java
new file mode 100644
index 0000000..b813afb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.compression;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.io.IJsonSerializable;
+
+/**
+ * {@link ICompressorDecompressor} factory.
+ *
+ * New factory of this interface must implement two methods as well if the compression is intended for storage:
+ * - {@link IJsonSerializable#toJson(org.apache.hyracks.api.io.IPersistedResourceRegistry)}
+ * - a static method fromJson(IPersistedResourceRegistry registry, JsonNode json)
+ */
+public interface ICompressorDecompressorFactory extends Serializable, IJsonSerializable {
+    /**
+     * Create a compressor/decompressor instance
+     *
+     * @return {@code ICompressorDecompressor}
+     */
+    ICompressorDecompressor createInstance();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
index 4ded855..8b93d07 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
@@ -27,7 +27,7 @@ import java.util.Date;
  * Used to identify a file in the local Node Controller.
  * Only used for files which are stored inside an IO device.
  */
-public final class FileReference implements Serializable {
+public class FileReference implements Serializable {
     private static final long serialVersionUID = 1L;
     private final File file;
     private final IODeviceHandle dev;
@@ -90,7 +90,11 @@ public final class FileReference implements Serializable {
     }
 
     public FileReference getChild(String name) {
-        return new FileReference(dev, path + File.separator + name);
+        return new FileReference(dev, getChildPath(name));
+    }
+
+    public String getChildPath(String name) {
+        return path + File.separator + name;
     }
 
     public void register() {
@@ -111,4 +115,8 @@ public final class FileReference implements Serializable {
         }
         registrationTime = 0;
     }
+
+    public boolean isCompressed() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java
index 38162c6..333b373 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java
@@ -46,4 +46,16 @@ public interface IPersistedResourceRegistry {
      * @throws HyracksDataException
      */
     IJsonSerializable deserialize(JsonNode json) throws HyracksDataException;
+
+    /**
+     * This method must be used for optional fields or newly added fields to ensure back-compatibility
+     *
+     * @param json
+     * @param clazz
+     * @return A class object of the type id in {@code json} if exists
+     *         or a class object of type <code>clazz</code> otherwise.
+     * @throws HyracksDataException
+     */
+    IJsonSerializable deserializeOrDefault(JsonNode json, Class<? extends IJsonSerializable> clazz)
+            throws HyracksDataException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index 91acea0..b8cf066 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvid
 import org.apache.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory;
 import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 import org.apache.hyracks.tests.am.common.LSMTreeOperatorTestHelper;
 
 public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
@@ -46,6 +47,6 @@ public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
                 NoOpIOOperationCallbackFactory.INSTANCE, pageManagerFactory, getVirtualBufferCacheProvider(),
                 SynchronousSchedulerProvider.INSTANCE, MERGE_POLICY_FACTORY, MERGE_POLICY_PROPERTIES, DURABLE,
                 bloomFilterKeyFields, LSMTreeOperatorTestHelper.DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE, true,
-                btreefields);
+                btreefields, NoOpCompressorDecompressorFactory.INSTANCE);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 4fc8af9..5d2cd26 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -1043,9 +1043,9 @@ public class BTree extends AbstractTreeIndex {
 
                         ((IBTreeLeafFrame) leafFrame).setNextLeaf(leafFrontier.pageId);
 
-                        queue.put(leafFrontier.page, this);
+                        putInQueue(leafFrontier.page);
                         for (ICachedPage c : pagesToWrite) {
-                            queue.put(c, this);
+                            putInQueue(c);
                         }
                         pagesToWrite.clear();
                         splitKey.setRightPage(leafFrontier.pageId);
@@ -1152,7 +1152,7 @@ public class BTree extends AbstractTreeIndex {
                 ICachedPage lastLeaf = nodeFrontiers.get(level).page;
                 int lastLeafPage = nodeFrontiers.get(level).pageId;
                 lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), nodeFrontiers.get(level).pageId));
-                queue.put(lastLeaf, this);
+                putInQueue(lastLeaf);
                 nodeFrontiers.get(level).page = null;
                 persistFrontiers(level + 1, lastLeafPage);
                 return;
@@ -1167,7 +1167,7 @@ public class BTree extends AbstractTreeIndex {
             ((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage);
             int finalPageId = freePageManager.takePage(metaFrame);
             frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
-            queue.put(frontier.page, this);
+            putInQueue(frontier.page);
             frontier.pageId = finalPageId;
             persistFrontiers(level + 1, finalPageId);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index 97e7ed7..7d43ed8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -33,6 +33,7 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager {
@@ -221,10 +222,13 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager
             }
             int finalMetaPage = getMaxPageId(metaFrame) + 1;
             confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
+            final ICompressedPageWriter compressedPageWriter = bufferCache.getCompressedPageWriter(fileId);
+            compressedPageWriter.prepareWrite(confiscatedPage);
             // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
             // won't be flushed to disk because it won't be dirty until the write latch has been released.
             queue.put(confiscatedPage, callback);
             bufferCache.finishQueue();
+            compressedPageWriter.endWriting();
             metadataPage = getMetadataPageId();
             ready = false;
         } else if (confiscatedPage != null) {
@@ -249,7 +253,8 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager
         }
         int pages = bufferCache.getNumPagesOfFile(fileId);
         if (pages == 0) {
-            return 0;
+            //At least there are 2 pages to consider the index is not empty
+            return IBufferCache.INVALID_PAGEID;
         }
         metadataPage = pages - 1;
         return metadataPage;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index b77f14f..f83a27d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
 import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 public abstract class AbstractTreeIndex implements ITreeIndex {
@@ -241,8 +242,9 @@ public abstract class AbstractTreeIndex implements ITreeIndex {
         // HDFS.  Since loading this tree relies on the root page actually being at that point, no further inserts into
         // that tree are allowed.  Currently, this is not enforced.
         protected boolean releasedLatches;
-        protected final IFIFOPageQueue queue;
+        private final IFIFOPageQueue queue;
         protected List<ICachedPage> pagesToWrite;
+        private final ICompressedPageWriter compressedPageWriter;
 
         public AbstractTreeIndexBulkLoader(float fillFactor) throws HyracksDataException {
             leafFrame = leafFrameFactory.createFrame();
@@ -278,10 +280,12 @@ public abstract class AbstractTreeIndex implements ITreeIndex {
 
             nodeFrontiers.add(leafFrontier);
             pagesToWrite = new ArrayList<>();
+            compressedPageWriter = bufferCache.getCompressedPageWriter(fileId);
         }
 
-        protected void handleException() throws HyracksDataException {
+        protected void handleException() {
             // Unlatch and unpin pages that weren't in the queue to avoid leaking memory.
+            compressedPageWriter.abort();
             for (NodeFrontier nodeFrontier : nodeFrontiers) {
                 ICachedPage frontierPage = nodeFrontier.page;
                 if (frontierPage.confiscated()) {
@@ -296,10 +300,10 @@ public abstract class AbstractTreeIndex implements ITreeIndex {
 
         @Override
         public void end() throws HyracksDataException {
-            bufferCache.finishQueue();
             if (hasFailed()) {
                 throw HyracksDataException.create(getFailure());
             }
+            bufferCache.finishQueue();
             freePageManager.setRootPageId(rootPage);
         }
 
@@ -320,6 +324,12 @@ public abstract class AbstractTreeIndex implements ITreeIndex {
         public void setLeafFrame(ITreeIndexFrame leafFrame) {
             this.leafFrame = leafFrame;
         }
+
+        public void putInQueue(ICachedPage cPage) throws HyracksDataException {
+            compressedPageWriter.prepareWrite(cPage);
+            queue.put(cPage, this);
+        }
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
index 76f7e61..89ccfed 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -55,12 +56,13 @@ public class ExternalBTreeLocalResource extends LSMBTreeLocalResource {
         super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, path,
                 storageManager, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories,
                 btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null,
-                ioSchedulerProvider, durable);
+                ioSchedulerProvider, durable, NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     private ExternalBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
             double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) throws HyracksDataException {
-        super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields);
+        super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
+                NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
index a4c24c9..555f641 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
 import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 
 public class ExternalBTreeLocalResourceFactory extends LSMBTreeLocalResourceFactory {
 
@@ -46,7 +47,7 @@ public class ExternalBTreeLocalResourceFactory extends LSMBTreeLocalResourceFact
         super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, null, ioSchedulerProvider,
                 mergePolicyFactory, mergePolicyProperties, durable, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
-                isPrimary, btreeFields);
+                isPrimary, btreeFields, NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
index 2a57e74..ddf0955 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -58,13 +59,14 @@ public class ExternalBTreeWithBuddyLocalResource extends LSMBTreeLocalResource {
         super(typeTraits, cmpFactories, buddyBtreeFields, bloomFilterFalsePositiveRate, isPrimary, path, storageManager,
                 mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields,
                 filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null,
-                ioSchedulerProvider, durable);
+                ioSchedulerProvider, durable, NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     private ExternalBTreeWithBuddyLocalResource(IPersistedResourceRegistry registry, JsonNode json,
             int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields)
             throws HyracksDataException {
-        super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields);
+        super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
+                NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
index 2aff61a..89e5154 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
 import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 
 public class ExternalBTreeWithBuddyLocalResourceFactory extends LSMBTreeLocalResourceFactory {
 
@@ -46,7 +47,7 @@ public class ExternalBTreeWithBuddyLocalResourceFactory extends LSMBTreeLocalRes
         super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null, ioSchedulerProvider,
                 mergePolicyFactory, mergePolicyProperties, durable, buddyBtreeFields, bloomFilterFalsePositiveRate,
-                isPrimary, btreeFields);
+                isPrimary, btreeFields, NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index 40278d0..7d5beff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -40,6 +41,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
 import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -52,6 +54,7 @@ public class LSMBTreeLocalResource extends LsmResource {
     protected final double bloomFilterFalsePositiveRate;
     protected final boolean isPrimary;
     protected final int[] btreeFields;
+    protected final ICompressorDecompressorFactory compressorDecompressorFactory;
 
     public LSMBTreeLocalResource(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
             int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, boolean isPrimary, String path,
@@ -60,7 +63,8 @@ public class LSMBTreeLocalResource extends LsmResource {
             IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields,
             ILSMOperationTrackerFactory opTrackerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
             IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable) {
+            ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable,
+            ICompressorDecompressorFactory compressorDecompressorFactory) {
         super(path, storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
                 mergePolicyFactory, mergePolicyProperties, durable);
@@ -68,15 +72,18 @@ public class LSMBTreeLocalResource extends LsmResource {
         this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
         this.isPrimary = isPrimary;
         this.btreeFields = btreeFields;
+        this.compressorDecompressorFactory = compressorDecompressorFactory;
     }
 
     protected LSMBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
-            double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) throws HyracksDataException {
+            double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
+            ICompressorDecompressorFactory compressorDecompressorFactory) throws HyracksDataException {
         super(registry, json);
         this.bloomFilterKeyFields = bloomFilterKeyFields;
         this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
         this.isPrimary = isPrimary;
         this.btreeFields = btreeFields;
+        this.compressorDecompressorFactory = compressorDecompressorFactory;
     }
 
     @Override
@@ -92,7 +99,8 @@ public class LSMBTreeLocalResource extends LsmResource {
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
                 opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
                 ioOpCallbackFactory, isPrimary, filterTypeTraits, filterCmpFactories, btreeFields, filterFields,
-                durable, metadataPageManagerFactory, updateAware, serviceCtx.getTracer());
+                durable, metadataPageManagerFactory, updateAware, serviceCtx.getTracer(),
+                compressorDecompressorFactory);
     }
 
     @Override
@@ -108,8 +116,11 @@ public class LSMBTreeLocalResource extends LsmResource {
         final double bloomFilterFalsePositiveRate = json.get("bloomFilterFalsePositiveRate").asDouble();
         final boolean isPrimary = json.get("isPrimary").asBoolean();
         final int[] btreeFields = OBJECT_MAPPER.convertValue(json.get("btreeFields"), int[].class);
+        final JsonNode compressorDecompressorNode = json.get("compressorDecompressorFactory");
+        final ICompressorDecompressorFactory compDecompFactory = (ICompressorDecompressorFactory) registry
+                .deserializeOrDefault(compressorDecompressorNode, NoOpCompressorDecompressorFactory.class);
         return new LSMBTreeLocalResource(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary,
-                btreeFields);
+                btreeFields, compDecompFactory);
     }
 
     @Override
@@ -120,5 +131,6 @@ public class LSMBTreeLocalResource extends LsmResource {
         json.put("bloomFilterFalsePositiveRate", bloomFilterFalsePositiveRate);
         json.put("isPrimary", isPrimary);
         json.putPOJO("btreeFields", btreeFields);
+        json.putPOJO("compressorDecompressorFactory", compressorDecompressorFactory.toJson(registry));
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
index 5fae5b9..ea41c3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.storage.am.lsm.btree.dataflow;
 
 import java.util.Map;
 
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.io.FileReference;
@@ -40,6 +41,7 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory {
     protected final double bloomFilterFalsePositiveRate;
     protected final boolean isPrimary;
     protected final int[] btreeFields;
+    protected final ICompressorDecompressorFactory compressorDecompressorFactory;
 
     public LSMBTreeLocalResourceFactory(IStorageManager storageManager, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] cmpFactories, ITypeTraits[] filterTypeTraits,
@@ -48,7 +50,8 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory {
             IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
             ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMMergePolicyFactory mergePolicyFactory,
             Map<String, String> mergePolicyProperties, boolean durable, int[] bloomFilterKeyFields,
-            double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) {
+            double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
+            ICompressorDecompressorFactory compressorDecompressorFactory) {
         super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
                 mergePolicyFactory, mergePolicyProperties, durable);
@@ -56,6 +59,7 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory {
         this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
         this.isPrimary = isPrimary;
         this.btreeFields = btreeFields;
+        this.compressorDecompressorFactory = compressorDecompressorFactory;
     }
 
     @Override
@@ -63,6 +67,6 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory {
         return new LSMBTreeLocalResource(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
                 isPrimary, fileRef.getRelativePath(), storageManager, mergePolicyFactory, mergePolicyProperties,
                 filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory,
-                metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable);
+                metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable, compressorDecompressorFactory);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index 0fc79eb..9169cbf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -21,12 +21,14 @@ package org.apache.hyracks.storage.am.lsm.btree.impls;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.btree.impls.DiskBTree;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
 
 public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent {
     protected final DiskBTree btree;
@@ -80,7 +82,12 @@ public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent {
 
     static Set<String> getFiles(BTree btree) {
         Set<String> files = new HashSet<>();
-        files.add(btree.getFileReference().getFile().getAbsolutePath());
+        final FileReference fileRef = btree.getFileReference();
+        files.add(fileRef.getAbsolutePath());
+        if (fileRef.isCompressed()) {
+            final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
+            files.add(cFileRef.getLAFAbsolutePath());
+        }
         return files;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 2240fd9..ca5f968 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -37,6 +38,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManage
 import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 
 public class LSMBTreeFileManager extends AbstractLSMIndexFileManager {
 
@@ -46,24 +48,30 @@ public class LSMBTreeFileManager extends AbstractLSMIndexFileManager {
     private final boolean hasBloomFilter;
 
     public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
-            TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter) {
-        super(ioManager, file, null);
+            TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter,
+            ICompressorDecompressorFactory compressorDecompressorFactory) {
+        super(ioManager, file, null, compressorDecompressorFactory);
         this.btreeFactory = btreeFactory;
         this.hasBloomFilter = hasBloomFilter;
     }
 
+    public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
+            TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter) {
+        this(ioManager, file, btreeFactory, hasBloomFilter, NoOpCompressorDecompressorFactory.INSTANCE);
+    }
+
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
         String baseName = getNextComponentSequence(btreeFilter);
-        return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
-                hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
+        return new LSMComponentFileReferences(getFileReference(baseName + DELIMITER + BTREE_SUFFIX), null,
+                hasBloomFilter ? getFileReference(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
     }
 
     @Override
     public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
         final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
-        return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
-                hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
+        return new LSMComponentFileReferences(getFileReference(baseName + DELIMITER + BTREE_SUFFIX), null,
+                hasBloomFilter ? getFileReference(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index d071bac..70ac3f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.btree.utils;
 
 import java.util.List;
 
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -72,8 +73,8 @@ public class LSMBTreeUtil {
             ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
             ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean needKeyDupCheck, ITypeTraits[] filterTypeTraits,
             IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields, boolean durable,
-            IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer)
-            throws HyracksDataException {
+            IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer,
+            ICompressorDecompressorFactory compressorDecompressorFactory) throws HyracksDataException {
         LSMBTreeTupleWriterFactory insertTupleWriterFactory =
                 new LSMBTreeTupleWriterFactory(typeTraits, cmpFactories.length, false, updateAware);
         LSMBTreeTupleWriterFactory deleteTupleWriterFactory =
@@ -106,10 +107,10 @@ public class LSMBTreeUtil {
             filterManager = new LSMComponentFilterManager(filterFrameFactory);
         }
 
-        //Primary LSMBTree index has a BloomFilter.
-        ILSMIndexFileManager fileNameManager =
-                new LSMBTreeFileManager(ioManager, file, diskBTreeFactory, needKeyDupCheck);
+        ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, file, diskBTreeFactory,
+                needKeyDupCheck, compressorDecompressorFactory);
 
+        //Primary LSMBTree index has a BloomFilter.
         ILSMDiskComponentFactory componentFactory;
         ILSMDiskComponentFactory bulkLoadComponentFactory;
         if (needKeyDupCheck) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 904029b..7618264 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -27,6 +27,8 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -38,6 +40,9 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressor;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.util.annotations.NotThreadSafe;
 
@@ -71,6 +76,10 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage
      */
     public static final String DELETE_TREE_SUFFIX = "d";
     /**
+     * Indicates Look Aside File (LAF) for compressed indexes
+     */
+    public static final String LAF_SUFFIX = ".dic";
+    /**
      * Hides transaction components until they are either committed by removing this file or deleted along with the file
      */
     public static final String TXN_PREFIX = ".T";
@@ -88,12 +97,20 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage
     protected final Comparator<IndexComponentFileReference> recencyCmp = new RecencyComparator();
     protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
     private long lastUsedComponentSeq = UNINITALIZED_COMPONENT_SEQ;
+    private final ICompressorDecompressorFactory compressorDecompressorFactory;
 
     public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file,
             TreeIndexFactory<? extends ITreeIndex> treeFactory) {
+        this(ioManager, file, treeFactory, NoOpCompressorDecompressorFactory.INSTANCE);
+    }
+
+    public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file,
+            TreeIndexFactory<? extends ITreeIndex> treeFactory,
+            ICompressorDecompressorFactory compressorDecompressorFactory) {
         this.ioManager = ioManager;
         this.baseDir = file;
         this.treeFactory = treeFactory;
+        this.compressorDecompressorFactory = compressorDecompressorFactory;
     }
 
     protected TreeIndexState isValidTreeIndex(ITreeIndex treeIndex) throws HyracksDataException {
@@ -131,7 +148,7 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage
             IBufferCache bufferCache) throws HyracksDataException {
         String[] files = listDirFiles(baseDir, filter);
         for (String fileName : files) {
-            FileReference fileRef = baseDir.getChild(fileName);
+            FileReference fileRef = getFileReference(fileName);
             if (treeFactory == null) {
                 allFiles.add(IndexComponentFileReference.of(fileRef));
                 continue;
@@ -362,6 +379,21 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage
         return IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq);
     }
 
+    protected FileReference getFileReference(String name) {
+        final ICompressorDecompressor compDecomp = compressorDecompressorFactory.createInstance();
+        //Avoid creating LAF file for NoOpCompressorDecompressor
+        if (compDecomp != NoOpCompressorDecompressor.INSTANCE && isCompressible(name)) {
+            final String path = baseDir.getChildPath(name);
+            return new CompressedFileReference(baseDir.getDeviceHandle(), compDecomp, path, path + LAF_SUFFIX);
+        }
+
+        return baseDir.getChild(name);
+    }
+
+    private boolean isCompressible(String fileName) {
+        return !fileName.endsWith(BLOOM_FILTER_SUFFIX) && !fileName.endsWith(DELETE_TREE_SUFFIX);
+    }
+
     private long getOnDiskLastUsedComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
         long maxComponentSeq = -1;
         final String[] files = listDirFiles(baseDir, filenameFilter);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index 635fe7a..b34a13c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -941,9 +941,10 @@ public class RTree extends AbstractTreeIndex {
                     propagateBulk(1, false, pagesToWrite);
 
                     leafFrontier.pageId = freePageManager.takePage(metaFrame);
-                    queue.put(leafFrontier.page, this);
+
+                    putInQueue(leafFrontier.page);
                     for (ICachedPage c : pagesToWrite) {
-                        queue.put(c, this);
+                        putInQueue(c);
                     }
                     pagesToWrite.clear();
                     leafFrontier.page = bufferCache
@@ -974,7 +975,7 @@ public class RTree extends AbstractTreeIndex {
             }
 
             for (ICachedPage c : pagesToWrite) {
-                queue.put(c, this);
+                putInQueue(c);
             }
             finish();
             super.end();
@@ -1011,7 +1012,7 @@ public class RTree extends AbstractTreeIndex {
                     ((RTreeNSMFrame) lowerFrame).adjustMBR();
                     interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0);
                 }
-                queue.put(n.page, this);
+                putInQueue(n.page);
                 n.page = null;
                 prevPageId = n.pageId;
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index 9c50f2d..423925b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -66,5 +66,10 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <version>1.1.7.1</version>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
new file mode 100644
index 0000000..47e7534
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+/**
+ * Handles all IO operations for a specified file.
+ */
+@NotThreadSafe
+public abstract class AbstractBufferedFileIOManager {
+    private static final String ERROR_MESSAGE = "%s unexpected number of bytes: [expected: %d, actual: %d, file: %s]";
+    private static final String READ = "Read";
+    private static final String WRITE = "Written";
+
+    protected final BufferCache bufferCache;
+    protected final IPageReplacementStrategy pageReplacementStrategy;
+    private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache;
+    private final IIOManager ioManager;
+
+    private IFileHandle fileHandle;
+    private volatile boolean hasOpen;
+
+    protected AbstractBufferedFileIOManager(BufferCache bufferCache, IIOManager ioManager,
+            BlockingQueue<BufferCacheHeaderHelper> headerPageCache, IPageReplacementStrategy pageReplacementStrategy) {
+        this.bufferCache = bufferCache;
+        this.ioManager = ioManager;
+        this.headerPageCache = headerPageCache;
+        this.pageReplacementStrategy = pageReplacementStrategy;
+        hasOpen = false;
+    }
+
+    /* ********************************
+     * Read/Write page methods
+     * ********************************
+     */
+
+    /**
+     * Read the CachedPage from disk
+     *
+     * @param cPage
+     *            CachedPage in {@link BufferCache}
+     * @throws HyracksDataException
+     */
+    public abstract void read(CachedPage cPage) throws HyracksDataException;
+
+    /**
+     * Write the CachedPage into disk
+     *
+     * @param cPage
+     *            CachedPage in {@link BufferCache}
+     * @throws HyracksDataException
+     */
+    public void write(CachedPage cPage) throws HyracksDataException {
+        final int totalPages = cPage.getFrameSizeMultiplier();
+        final int extraBlockPageId = cPage.getExtraBlockPageId();
+        final BufferCacheHeaderHelper header = checkoutHeaderHelper();
+        write(cPage, header, totalPages, extraBlockPageId);
+    }
+
+    /**
+     * Write the CachedPage into disk called by {@link AbstractBufferedFileIOManager#write(CachedPage)}
+     * Note: It is the responsibility of the caller to return {@link BufferCacheHeaderHelper}
+     *
+     * @param cPage
+     *            CachedPage that will be written
+     * @param header
+     *            HeaderHelper to add into the written page
+     * @param totalPages
+     *            Number of pages to be written
+     * @param extraBlockPageId
+     *            Extra page ID in case it has more than one page
+     * @throws HyracksDataException
+     */
+    protected abstract void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages,
+            int extraBlockPageId) throws HyracksDataException;
+
+    /* ********************************
+     * File operations' methods
+     * ********************************
+     */
+
+    /**
+     * Open the file
+     *
+     * @throws HyracksDataException
+     */
+    public void open(FileReference fileRef) throws HyracksDataException {
+        fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        hasOpen = true;
+    }
+
+    /**
+     * Close the file
+     *
+     * @throws HyracksDataException
+     */
+    public void close() throws HyracksDataException {
+        if (hasOpen) {
+            ioManager.close(fileHandle);
+        }
+    }
+
+    public void purge() throws HyracksDataException {
+        ioManager.close(fileHandle);
+    }
+
+    /**
+     * Force the file into disk
+     *
+     * @param metadata
+     *            see {@link java.nio.channels.FileChannel#force(boolean)}
+     * @throws HyracksDataException
+     */
+    public void force(boolean metadata) throws HyracksDataException {
+        ioManager.sync(fileHandle, metadata);
+    }
+
+    /**
+     * Get the number of pages in the file
+     *
+     * @throws HyracksDataException
+     */
+    public abstract int getNumberOfPages() throws HyracksDataException;
+
+    public void markAsDeleted() throws HyracksDataException {
+        fileHandle = null;
+    }
+
+    /**
+     * Check whether the file has been deleted
+     *
+     * @return
+     *         true if has been deleted, false o.w
+     */
+    public boolean hasBeenDeleted() {
+        return fileHandle == null;
+    }
+
+    /**
+     * Check whether the file has ever been opened
+     *
+     * @return
+     *         true if has ever been open, false o.w
+     */
+    public final boolean hasBeenOpened() {
+        return hasOpen;
+    }
+
+    public final FileReference getFileReference() {
+        return fileHandle.getFileReference();
+    }
+
+    public static void createFile(BufferCache bufferCache, FileReference fileRef) throws HyracksDataException {
+        IoUtil.create(fileRef);
+        if (fileRef.isCompressed()) {
+            final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
+            try {
+                bufferCache.createFile(cFileRef.getLAFFileReference());
+            } catch (HyracksDataException e) {
+                //In case of creating the LAF file failed, delete fileRef
+                IoUtil.delete(fileRef);
+                throw e;
+            }
+        }
+    }
+
+    public static void deleteFile(FileReference fileRef) throws HyracksDataException {
+        IoUtil.delete(fileRef);
+        if (fileRef.isCompressed()) {
+            final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
+            if (cFileRef.getFile().exists()) {
+                IoUtil.delete(cFileRef.getLAFFileReference());
+            }
+        }
+    }
+
+    /* ********************************
+     * Compressed file methods
+     * ********************************
+     */
+
+    public abstract ICompressedPageWriter getCompressedPageWriter();
+
+    /* ********************************
+     * Common helper methods
+     * ********************************
+     */
+
+    /**
+     * Get the offset for the first page
+     *
+     * @param cPage
+     *            CachedPage for which the offset is needed
+     * @return
+     *         page offset in the file
+     */
+    protected abstract long getFirstPageOffset(CachedPage cPage);
+
+    /**
+     * Get the offset for the extra page
+     *
+     * @param cPage
+     *            CachedPage for which the offset is needed
+     * @return
+     *         page offset in the file
+     */
+    protected abstract long getExtraPageOffset(CachedPage cPage);
+
+    protected final BufferCacheHeaderHelper checkoutHeaderHelper() {
+        BufferCacheHeaderHelper helper = headerPageCache.poll();
+        if (helper == null) {
+            helper = new BufferCacheHeaderHelper(bufferCache.getPageSize());
+        }
+        return helper;
+    }
+
+    protected final void returnHeaderHelper(BufferCacheHeaderHelper buffer) {
+        headerPageCache.offer(buffer); //NOSONAR
+    }
+
+    protected final long readToBuffer(ByteBuffer buf, long offset) throws HyracksDataException {
+        return ioManager.syncRead(fileHandle, offset, buf);
+    }
+
+    protected final long writeToFile(ByteBuffer buf, long offset) throws HyracksDataException {
+        return ioManager.syncWrite(fileHandle, offset, buf);
+    }
+
+    protected final long writeToFile(ByteBuffer[] buf, long offset) throws HyracksDataException {
+        return ioManager.syncWrite(fileHandle, offset, buf);
+    }
+
+    protected final long getFileSize() {
+        return ioManager.getSize(fileHandle);
+    }
+
+    protected final void verifyBytesWritten(long expected, long actual) {
+        if (expected != actual) {
+            throwException(WRITE, expected, actual);
+        }
+    }
+
+    protected final boolean verifyBytesRead(long expected, long actual) {
+        if (expected != actual) {
+            if (actual == -1) {
+                // disk order scan code seems to rely on this behavior, so silently return
+                return false;
+            } else {
+                throwException(READ, expected, actual);
+            }
+        }
+        return true;
+    }
+
+    protected void throwException(String op, long expected, long actual) {
+        final String path = fileHandle.getFileReference().getAbsolutePath();
+        throw new IllegalStateException(String.format(ERROR_MESSAGE, op, expected, actual, path));
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 1e3f85b..7441395 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -22,7 +22,6 @@ import static org.apache.hyracks.control.nc.io.IOManager.IO_REQUEST_QUEUE_SIZE;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -42,12 +41,12 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
 import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.storage.common.file.IFileMapManager;
 import org.apache.logging.log4j.Level;
@@ -80,18 +79,18 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache =
             new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE);
 
+    private IIOReplicationManager ioReplicationManager;
+    private final List<ICachedPageInternal> cachedPages = new ArrayList<>();
+    private final AtomicLong masterPinCount = new AtomicLong();
+
+    private boolean closed;
+
     //DEBUG
     private static final Level fileOpsLevel = Level.TRACE;
     private ArrayList<CachedPage> confiscatedPages;
     private Lock confiscateLock;
     private HashMap<CachedPage, StackTraceElement[]> confiscatedPagesOwner;
     private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner;
-    //!DEBUG
-    private IIOReplicationManager ioReplicationManager;
-    private final List<ICachedPageInternal> cachedPages = new ArrayList<>();
-    private final AtomicLong masterPinCount = new AtomicLong();
-
-    private boolean closed;
 
     public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy,
             IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles,
@@ -158,7 +157,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         synchronized (fileInfoMap) {
             fInfo = fileInfoMap.get(fileId);
         }
-        if (fInfo == null) {
+        if (fInfo == null || fInfo.hasBeenDeleted() || !fInfo.hasBeenOpened()) {
             throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been created.");
         } else if (fInfo.getReferenceCount() <= 0) {
             throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been opened.");
@@ -546,35 +545,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     }
 
     private void read(CachedPage cPage) throws HyracksDataException {
-        BufferedFileHandle fInfo = getFileInfo(cPage);
+        BufferedFileHandle fInfo = getFileHandle(cPage);
         cPage.buffer.clear();
-        BufferCacheHeaderHelper header = checkoutHeaderHelper();
-        try {
-            long bytesRead = ioManager.syncRead(fInfo.getFileHandle(),
-                    getOffsetForPage(BufferedFileHandle.getPageId(cPage.dpid)), header.prepareRead());
-
-            if (bytesRead != getPageSizeWithHeader()) {
-                if (bytesRead == -1) {
-                    // disk order scan code seems to rely on this behavior, so silently return
-                    return;
-                }
-                throw new HyracksDataException("Failed to read a complete page: " + bytesRead);
-            }
-            int totalPages = header.processRead(cPage);
-
-            if (totalPages > 1) {
-                pageReplacementStrategy.fixupCapacityOnLargeRead(cPage);
-                cPage.buffer.position(pageSize);
-                cPage.buffer.limit(totalPages * pageSize);
-                ioManager.syncRead(fInfo.getFileHandle(), getOffsetForPage(cPage.getExtraBlockPageId()), cPage.buffer);
-            }
-        } finally {
-            returnHeaderHelper(header);
-        }
-    }
-
-    private long getOffsetForPage(long pageId) {
-        return pageId * getPageSizeWithHeader();
+        fInfo.read(cPage);
     }
 
     @Override
@@ -583,67 +556,16 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         pageReplacementStrategy.resizePage((ICachedPageInternal) cPage, totalPages, extraPageBlockHelper);
     }
 
-    BufferedFileHandle getFileInfo(CachedPage cPage) throws HyracksDataException {
-        return getFileInfo(BufferedFileHandle.getFileId(cPage.dpid));
-    }
-
-    BufferedFileHandle getFileInfo(int fileId) throws HyracksDataException {
-        BufferedFileHandle fInfo;
-        synchronized (fileInfoMap) {
-            fInfo = fileInfoMap.get(fileId);
-        }
-        if (fInfo == null) {
-            throw HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, fileId);
-        }
-        return fInfo;
-    }
-
-    private BufferCacheHeaderHelper checkoutHeaderHelper() {
-        BufferCacheHeaderHelper helper = headerPageCache.poll();
-        if (helper == null) {
-            helper = new BufferCacheHeaderHelper(pageSize);
-        }
-        return helper;
-    }
-
-    private void returnHeaderHelper(BufferCacheHeaderHelper buffer) {
-        headerPageCache.offer(buffer);
-    }
-
     void write(CachedPage cPage) throws HyracksDataException {
-        BufferedFileHandle fInfo = getFileInfo(cPage);
+        BufferedFileHandle fInfo = getFileHandle(cPage);
         // synchronize on fInfo to prevent the file handle from being deleted until the page is written.
         synchronized (fInfo) {
-            if (fInfo.fileHasBeenDeleted()) {
+            if (fInfo.hasBeenDeleted()) {
                 return;
             }
-            ByteBuffer buf = cPage.buffer.duplicate();
-            final int totalPages = cPage.getFrameSizeMultiplier();
-            final int extraBlockPageId = cPage.getExtraBlockPageId();
-            final boolean contiguousLargePages = (BufferedFileHandle.getPageId(cPage.dpid) + 1) == extraBlockPageId;
-            BufferCacheHeaderHelper header = checkoutHeaderHelper();
-            try {
-                buf.limit(contiguousLargePages ? pageSize * totalPages : pageSize);
-                buf.position(0);
-                long bytesWritten = ioManager.syncWrite(fInfo.getFileHandle(),
-                        getOffsetForPage(BufferedFileHandle.getPageId(cPage.dpid)), header.prepareWrite(cPage, buf));
-
-                if (bytesWritten != (contiguousLargePages ? pageSize * (totalPages - 1) : 0)
-                        + getPageSizeWithHeader()) {
-                    throw new HyracksDataException("Failed to write completely: " + bytesWritten);
-                }
-            } finally {
-                returnHeaderHelper(header);
-            }
-            if (totalPages > 1 && !contiguousLargePages) {
-                buf.limit(totalPages * pageSize);
-                ioManager.syncWrite(fInfo.getFileHandle(), getOffsetForPage(extraBlockPageId), buf);
-            }
-            if (buf.capacity() != pageSize * totalPages) {
-                throw new IllegalStateException("Illegal number of bytes written, expected bytes written: "
-                        + pageSize * totalPages + " actual bytes writte: " + buf.capacity());
-            }
+            fInfo.write(cPage);
         }
+
     }
 
     @Override
@@ -794,8 +716,8 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         synchronized (fileInfoMap) {
             fileInfoMap.forEach((key, value) -> {
                 try {
-                    sweepAndFlush(key, true);
-                    ioManager.close(value.getFileHandle());
+                    sweepAndFlush(value, true);
+                    value.close();
                 } catch (HyracksDataException e) {
                     if (LOGGER.isWarnEnabled()) {
                         LOGGER.log(Level.WARN, "Error flushing file id: " + key, e);
@@ -811,11 +733,14 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         if (LOGGER.isEnabled(fileOpsLevel)) {
             LOGGER.log(fileOpsLevel, "Creating file: " + fileRef + " in cache: " + this);
         }
-        IoUtil.create(fileRef);
+        BufferedFileHandle.createFile(this, fileRef);
+        int fileId;
         try {
             synchronized (fileInfoMap) {
-                return fileMapManager.registerFile(fileRef);
+                fileId = fileMapManager.registerFile(fileRef);
+                getOrCreateFileHandle(fileId);
             }
+            return fileId;
         } catch (Exception e) {
             // If file registration failed for any reason, we need to undo the file creation
             try {
@@ -851,11 +776,14 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         }
         try {
             final BufferedFileHandle fInfo = getOrCreateFileHandle(fileId);
-            if (fInfo.getFileHandle() == null) {
+            //CompressedFileReference may open another file which may sweep and close out this fInfo
+            fInfo.incReferenceCount();
+
+            if (!fInfo.hasBeenOpened()) {
                 // a new file
                 synchronized (fInfo) {
                     // prevent concurrent opening of the same file
-                    if (fInfo.getFileHandle() == null) {
+                    if (!fInfo.hasBeenOpened()) {
                         if (fileInfoMap.size() > maxOpenFiles) {
                             closeOpeningFiles(fInfo);
                         }
@@ -864,15 +792,12 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                         synchronized (fileInfoMap) {
                             fileRef = fileMapManager.lookupFileName(fileId);
                         }
-                        IFileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
-                                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-                        fInfo.setFileHandle(fh);
+                        fInfo.open(fileRef);
                     }
                 }
             }
-            fInfo.incReferenceCount();
         } catch (Exception e) {
-            removeFileInfo(fileId);
+            removeFileHandle(fileId);
             throw HyracksDataException.create(e);
         }
     }
@@ -888,11 +813,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
                     if (fh != newFileHandle && fh.getReferenceCount() <= 0) {
                         if (fh.getReferenceCount() < 0) {
                             throw new IllegalStateException("Illegal reference count " + fh.getReferenceCount()
-                                    + " of file " + fh.getFileHandle().getFileReference());
+                                    + " of file " + fh.getFileReference());
                         }
                         int entryFileId = entry.getKey();
-                        sweepAndFlush(entryFileId, true);
-                        ioManager.close(entry.getValue().getFileHandle());
+                        sweepAndFlush(fh, true);
+                        entry.getValue().close();
                         fileInfoMap.remove(entryFileId);
                         unreferencedFileFound = true;
                         // for-each iterator is invalid because we changed
@@ -908,7 +833,12 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         }
     }
 
-    private void sweepAndFlush(int fileId, boolean flushDirtyPages) throws HyracksDataException {
+    private void sweepAndFlush(BufferedFileHandle fInfo, boolean flushDirtyPages) throws HyracksDataException {
+        if (!fInfo.hasBeenOpened()) {
+            //Skip flushing as the file has not been open
+            return;
+        }
+        final int fileId = fInfo.getFileId();
         for (final CacheBucket bucket : pageMap) {
             bucket.bucketLock.lock();
             try {
@@ -973,7 +903,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
 
         synchronized (fileInfoMap) {
             BufferedFileHandle fInfo = fileInfoMap.get(fileId);
-            if (fInfo == null) {
+            if (fInfo == null || !fInfo.hasBeenOpened()) {
                 throw new HyracksDataException("Closing unopened file");
             }
             if (fInfo.decReferenceCount() < 0) {
@@ -997,7 +927,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         synchronized (fileInfoMap) {
             fInfo = fileInfoMap.get(fileId);
         }
-        ioManager.sync(fInfo.getFileHandle(), metadata);
+        fInfo.force(metadata);
     }
 
     @Override
@@ -1013,7 +943,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         if (mapped) {
             deleteFile(fileId);
         } else {
-            IoUtil.delete(fileRef);
+            BufferedFileHandle.deleteFile(fileRef);
         }
     }
 
@@ -1022,11 +952,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         if (LOGGER.isEnabled(fileOpsLevel)) {
             LOGGER.log(fileOpsLevel, "Deleting file: " + fileId + " in cache: " + this);
         }
-        BufferedFileHandle fInfo = removeFileInfo(fileId);
+        BufferedFileHandle fInfo = removeFileHandle(fileId);
         if (fInfo == null) {
             return;
         }
-        sweepAndFlush(fileId, false);
+        sweepAndFlush(fInfo, false);
         try {
             if (fInfo.getReferenceCount() > 0) {
                 throw new HyracksDataException("Deleting open file");
@@ -1040,11 +970,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             } finally {
                 try {
                     synchronized (fInfo) {
-                        ioManager.close(fInfo.getFileHandle());
+                        fInfo.close();
                         fInfo.markAsDeleted();
                     }
                 } finally {
-                    IoUtil.delete(fileRef);
+                    BufferedFileHandle.deleteFile(fileRef);
                 }
             }
         }
@@ -1178,10 +1108,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
             if (fInfo == null) {
                 throw new HyracksDataException("No such file mapped for fileId:" + fileId);
             }
-            if (DEBUG) {
-                assert ioManager.getSize(fInfo.getFileHandle()) % getPageSizeWithHeader() == 0;
-            }
-            return (int) (ioManager.getSize(fInfo.getFileHandle()) / getPageSizeWithHeader());
+            return fInfo.getNumberOfPages();
         }
     }
 
@@ -1290,18 +1217,35 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         return null;
     }
 
-    private BufferedFileHandle getOrCreateFileHandle(int fileId) {
+    private BufferedFileHandle getOrCreateFileHandle(int fileId) throws HyracksDataException {
         synchronized (fileInfoMap) {
-            return fileInfoMap.computeIfAbsent(fileId, id -> new BufferedFileHandle(fileId, null));
+            final FileReference fileRef = fileMapManager.lookupFileName(fileId);
+            return fileInfoMap.computeIfAbsent(fileId, id -> BufferedFileHandle.create(fileRef, fileId, this, ioManager,
+                    headerPageCache, pageReplacementStrategy));
         }
     }
 
-    private BufferedFileHandle removeFileInfo(int fileId) {
+    private BufferedFileHandle removeFileHandle(int fileId) {
         synchronized (fileInfoMap) {
             return fileInfoMap.remove(fileId);
         }
     }
 
+    private BufferedFileHandle getFileHandle(CachedPage cPage) throws HyracksDataException {
+        return getFileHandle(BufferedFileHandle.getFileId(cPage.dpid));
+    }
+
+    private BufferedFileHandle getFileHandle(int fileId) throws HyracksDataException {
+        BufferedFileHandle fInfo;
+        synchronized (fileInfoMap) {
+            fInfo = fileInfoMap.get(fileId);
+        }
+        if (fInfo == null) {
+            throw HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, fileId);
+        }
+        return fInfo;
+    }
+
     private ICachedPage getPageLoop(long dpid, int multiplier, boolean confiscate) throws HyracksDataException {
         final long startingPinCount = DEBUG ? masterPinCount.get() : -1;
         int cycleCount = 0;
@@ -1442,54 +1386,16 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
 
     @Override
     public void purgeHandle(int fileId) throws HyracksDataException {
-        BufferedFileHandle fh = removeFileInfo(fileId);
+        BufferedFileHandle fh = removeFileHandle(fileId);
         if (fh != null) {
             synchronized (fileInfoMap) {
                 fileMapManager.unregisterFile(fileId);
+                fh.purge();
             }
-            ioManager.close(fh.getFileHandle());
         }
 
     }
 
-    static class BufferCacheHeaderHelper {
-        private static final int FRAME_MULTIPLIER_OFF = 0;
-        private static final int EXTRA_BLOCK_PAGE_ID_OFF = FRAME_MULTIPLIER_OFF + 4; // 4
-
-        private final ByteBuffer buf;
-        private final ByteBuffer[] array;
-
-        private BufferCacheHeaderHelper(int pageSize) {
-            buf = ByteBuffer.allocate(RESERVED_HEADER_BYTES + pageSize);
-            array = new ByteBuffer[] { buf, null };
-        }
-
-        private ByteBuffer[] prepareWrite(CachedPage cPage, ByteBuffer pageBuffer) {
-            buf.position(0);
-            buf.limit(RESERVED_HEADER_BYTES);
-            buf.putInt(FRAME_MULTIPLIER_OFF, cPage.getFrameSizeMultiplier());
-            buf.putInt(EXTRA_BLOCK_PAGE_ID_OFF, cPage.getExtraBlockPageId());
-            array[1] = pageBuffer;
-            return array;
-        }
-
-        private ByteBuffer prepareRead() {
-            buf.position(0);
-            buf.limit(buf.capacity());
-            return buf;
-        }
-
-        private int processRead(CachedPage cPage) {
-            buf.position(RESERVED_HEADER_BYTES);
-            cPage.buffer.position(0);
-            cPage.buffer.put(buf);
-            int multiplier = buf.getInt(FRAME_MULTIPLIER_OFF);
-            cPage.setFrameSizeMultiplier(multiplier);
-            cPage.setExtraBlockPageId(buf.getInt(EXTRA_BLOCK_PAGE_ID_OFF));
-            return multiplier;
-        }
-    }
-
     @Override
     public void closeFileIfOpen(FileReference fileRef) {
         synchronized (fileInfoMap) {
@@ -1508,4 +1414,14 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
         }
     }
 
+    @Override
+    public ICompressedPageWriter getCompressedPageWriter(int fileId) {
+        final BufferedFileHandle fInfo;
+        synchronized (fileInfoMap) {
+            fInfo = fileInfoMap.get(fileId);
+        }
+
+        return fInfo.getCompressedPageWriter();
+    }
+
 }
\ No newline at end of file