You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/11/07 07:15:47 UTC

[02/10] asterixdb git commit: [ASTERIXDB-2422][STO] Introduce compressed storage

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
new file mode 100644
index 0000000..a913513
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import static org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+
+public class BufferCacheHeaderHelper {
+    private static final int FRAME_MULTIPLIER_OFF = 0;
+    private static final int EXTRA_BLOCK_PAGE_ID_OFF = FRAME_MULTIPLIER_OFF + 4; // 4
+
+    private final ByteBuffer[] array;
+    private final int pageSizeWithHeader;
+    private ByteBuffer buf;
+
+    public BufferCacheHeaderHelper(int pageSize) {
+        this.pageSizeWithHeader = RESERVED_HEADER_BYTES + pageSize;
+        buf = ByteBuffer.allocate(pageSizeWithHeader);
+        array = new ByteBuffer[] { buf, null };
+    }
+
+    public ByteBuffer[] prepareWrite(CachedPage cPage) {
+        setPageInfo(cPage);
+        buf.position(0);
+        buf.limit(RESERVED_HEADER_BYTES);
+        array[1] = cPage.buffer;
+        return array;
+    }
+
+    public ByteBuffer prepareWrite(CachedPage cPage, int requiredSize) {
+        ensureBufferCapacity(requiredSize);
+        setPageInfo(cPage);
+        buf.position(RESERVED_HEADER_BYTES);
+        buf.limit(buf.capacity());
+        return buf;
+    }
+
+    public ByteBuffer prepareRead(int size) {
+        buf.position(0);
+        buf.limit(size);
+        return buf;
+    }
+
+    public ByteBuffer processHeader(CachedPage cPage) {
+        cPage.setFrameSizeMultiplier(buf.getInt(FRAME_MULTIPLIER_OFF));
+        cPage.setExtraBlockPageId(buf.getInt(EXTRA_BLOCK_PAGE_ID_OFF));
+        buf.position(RESERVED_HEADER_BYTES);
+        return buf;
+    }
+
+    private void setPageInfo(CachedPage cPage) {
+        buf.putInt(FRAME_MULTIPLIER_OFF, cPage.getFrameSizeMultiplier());
+        buf.putInt(EXTRA_BLOCK_PAGE_ID_OFF, cPage.getExtraBlockPageId());
+    }
+
+    /**
+     * {@link ICompressorDecompressor#compress(byte[], int, int, byte[], int)} may require additional
+     * space to do the compression. see {@link ICompressorDecompressor#computeCompressedBufferSize(int)}.
+     *
+     * @param compressor
+     * @param size
+     */
+    private void ensureBufferCapacity(int size) {
+        final int requiredSize = size + RESERVED_HEADER_BYTES;
+        if (buf.capacity() < requiredSize) {
+            buf = ByteBuffer.allocate(requiredSize);
+            array[0] = buf;
+        }
+        buf.limit(buf.capacity());
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 6ec12aa..cf0553c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -45,6 +45,8 @@ public class CachedPage implements ICachedPageInternal {
     private IQueueInfo queueInfo;
     private int multiplier;
     private int extraBlockPageId;
+    private long compressedOffset;
+    private int compressedSize;
     // DEBUG
     private static final boolean DEBUG = false;
     private final StackTraceElement[] ctorStack;
@@ -224,4 +226,23 @@ public class CachedPage implements ICachedPageInternal {
             LOGGER.error("An IO Failure took place but the failure callback is not set", e);
         }
     }
+
+    public void setCompressedPageOffset(long offset) {
+        this.compressedOffset = offset;
+    }
+
+    @Override
+    public long getCompressedPageOffset() {
+        return compressedOffset;
+    }
+
+    @Override
+    public void setCompressedPageSize(int size) {
+        this.compressedSize = size;
+    }
+
+    @Override
+    public int getCompressedPageSize() {
+        return compressedSize;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
index 21d3677..c762dd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.common.buffercache;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
 
 public interface IBufferCache {
 
@@ -278,4 +279,11 @@ public interface IBufferCache {
      */
     void closeFileIfOpen(FileReference fileRef);
 
+    /**
+     * @return compressed page writer
+     */
+    default ICompressedPageWriter getCompressedPageWriter(int fileId) {
+        throw new UnsupportedOperationException(this.getClass().getName() + " does not support compressed pages");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
index d900852..04e93db 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
@@ -35,4 +35,12 @@ public interface ICachedPageInternal extends ICachedPage {
     int getExtraBlockPageId();
 
     void setExtraBlockPageId(int extraBlockPageId);
+
+    void setCompressedPageOffset(long offset);
+
+    long getCompressedPageOffset();
+
+    void setCompressedPageSize(int size);
+
+    int getCompressedPageSize();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
new file mode 100644
index 0000000..c4855bd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NoOpCompressorDecompressor implements ICompressorDecompressor {
+    public static final NoOpCompressorDecompressor INSTANCE = new NoOpCompressorDecompressor();
+
+    private NoOpCompressorDecompressor() {
+    }
+
+    @Override
+    public int computeCompressedBufferSize(int uBufferSize) {
+        return 0;
+    }
+
+    @Override
+    public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
+        return uBuffer;
+    }
+
+    @Override
+    public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
+        return cBuffer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
new file mode 100644
index 0000000..690f4a2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class NoOpCompressorDecompressorFactory implements ICompressorDecompressorFactory {
+    private static final long serialVersionUID = 1L;
+    public static final ICompressorDecompressorFactory INSTANCE = new NoOpCompressorDecompressorFactory();
+
+    @Override
+    public ICompressorDecompressor createInstance() {
+        return NoOpCompressorDecompressor.INSTANCE;
+    }
+
+    @Override
+    public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
+        return registry.getClassIdentifier(getClass(), serialVersionUID);
+    }
+
+    @SuppressWarnings("squid:S1172") // unused parameter
+    public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
new file mode 100644
index 0000000..16c9a2d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.xerial.snappy.Snappy;
+
+/**
+ * Built-in Snappy compressor/decompressor wrapper
+ */
+public class SnappyCompressorDecompressor implements ICompressorDecompressor {
+    protected static final SnappyCompressorDecompressor INSTANCE = new SnappyCompressorDecompressor();
+
+    private SnappyCompressorDecompressor() {
+
+    }
+
+    @Override
+    public int computeCompressedBufferSize(int uncompressedBufferSize) {
+        return Snappy.maxCompressedLength(uncompressedBufferSize);
+    }
+
+    @Override
+    public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
+        try {
+            final int cLength = Snappy.compress(uBuffer.array(), uBuffer.position(), uBuffer.remaining(),
+                    cBuffer.array(), cBuffer.position());
+            cBuffer.limit(cBuffer.position() + cLength);
+            return cBuffer;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
+        try {
+            final int uLength = Snappy.uncompress(cBuffer.array(), cBuffer.position(), cBuffer.remaining(),
+                    uBuffer.array(), uBuffer.position());
+            uBuffer.limit(uBuffer.position() + uLength);
+            return uBuffer;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java
new file mode 100644
index 0000000..93f31bd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class SnappyCompressorDecompressorFactory implements ICompressorDecompressorFactory {
+    private static final long serialVersionUID = 1L;
+    private static final ICompressorDecompressorFactory INSTANCE = new SnappyCompressorDecompressorFactory();
+
+    @Override
+    public ICompressorDecompressor createInstance() {
+        return SnappyCompressorDecompressor.INSTANCE;
+    }
+
+    @Override
+    public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
+        return registry.getClassIdentifier(getClass(), serialVersionUID);
+    }
+
+    @SuppressWarnings("squid:S1172") // unused parameter
+    public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
+        return INSTANCE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
new file mode 100644
index 0000000..70d1388
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression.file;
+
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPageInternal;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+/**
+ * CompressedFileManager is responsible to manage the Look Aside File (LAF file), which contains
+ * the compression information. LAF file format is as follow:
+ *
+ * [<offset0, size0>, <offset1, size1> .... <offsetN, sizeN>]
+ * Each entry <offsetM, sizeM> is an entry of 16-bytes for page M (8 bytes for offset and 8 for size).
+ *
+ * The file is responsible to store the beginning and the size of each page after compression.
+ */
+public class CompressedFileManager {
+    protected static final int SIZE_ENTRY_OFFSET = 8; // 0 is for the compressed page offset
+    protected static final int ENTRY_LENGTH = 16; //<offset(8-bytes),size(8-bytes)>
+    protected static final int EOF = -1;
+    private static final EnumSet<State> CLOSED = EnumSet.of(State.CLOSED);
+    private static final EnumSet<State> READABLE_WRITABLE = EnumSet.of(State.READABLE, State.WRITABLE);
+    private static final EnumSet<State> READABLE = EnumSet.of(State.READABLE);
+    private static final EnumSet<State> WRITABLE = EnumSet.of(State.WRITABLE);
+
+    private enum State {
+        READABLE,
+        WRITABLE,
+        CLOSED
+    }
+
+    private final IBufferCache bufferCache;
+    private final int fileId;
+    private final ICompressorDecompressor compressorDecompressor;
+
+    private State state;
+    private int totalNumOfPages;
+
+    private LAFWriter lafWriter;
+
+    public CompressedFileManager(IBufferCache bufferCache, int fileId, CompressedFileReference fileRef) {
+        state = State.CLOSED;
+        totalNumOfPages = 0;
+        this.bufferCache = bufferCache;
+        this.fileId = fileId;
+        this.compressorDecompressor = fileRef.getCompressorDecompressor();
+    }
+
+    /**
+     * If the file is empty (i.e. the number of pages is zero)
+     * Then the state will be WRITABLE.
+     *
+     * @throws HyracksDataException
+     */
+    public void open() throws HyracksDataException {
+        ensureState(CLOSED);
+        changeToFunctionalState();
+    }
+
+    /**
+     * Close the LAF file.
+     *
+     * @throws HyracksDataException
+     */
+    public void close() {
+        ensureState(READABLE_WRITABLE);
+        state = State.CLOSED;
+    }
+
+    /* ************************
+     * LAF writing methods
+     * ************************
+     */
+
+    public ICompressedPageWriter getCompressedPageWriter() {
+        ensureState(WRITABLE);
+        return lafWriter;
+    }
+
+    /**
+     * Add page information (offset, size) after compression.
+     *
+     * @param dpid
+     * @param size
+     * @return offset for the compressed page.
+     * @throws HyracksDataException
+     */
+    public long writePageInfo(long dpid, long size) throws HyracksDataException {
+        final int pageId = BufferedFileHandle.getPageId(dpid);
+        //Write the page (extraPageIndex = 0)
+        return writeExtraPageInfo(pageId, size, 0);
+    }
+
+    /**
+     * Add extra page information (offset, size) after compression.
+     *
+     * @param extraPageId
+     *            extra page ID
+     * @param size
+     *            size of the extra page
+     * @param extraPageIndex
+     *            the index of the extra page (starting from 0)
+     * @return offset for the compressed page.
+     * @throws HyracksDataException
+     */
+    public long writeExtraPageInfo(int extraPageId, long size, int extraPageIndex) throws HyracksDataException {
+        ensureState(WRITABLE);
+
+        final long compressedPageOffset;
+        try {
+            compressedPageOffset = lafWriter.writePageInfo(extraPageId + extraPageIndex, size);
+        } catch (HyracksDataException e) {
+            lafWriter.abort();
+            throw e;
+        }
+
+        return compressedPageOffset;
+    }
+
+    /**
+     * This methods is used by {@link LAFWriter#endWriting()} to signal the end of writing.
+     * After calling this methods, LAF file will be READ-ONLY.
+     *
+     * @param totalNumOfPages
+     *            The total number of pages of the index
+     * @throws HyracksDataException
+     */
+    void endWriting(int totalNumOfPages) {
+        ensureState(WRITABLE);
+        this.totalNumOfPages = totalNumOfPages;
+        lafWriter = null;
+        state = State.READABLE;
+    }
+
+    /* ************************
+     * LAF reading methods
+     * ************************
+     */
+
+    /**
+     * Set the compressed page offset and size
+     *
+     * @param compressedPage
+     * @throws HyracksDataException
+     */
+    public void setCompressedPageInfo(ICachedPageInternal compressedPage) throws HyracksDataException {
+        setCompressedPageInfo(BufferedFileHandle.getPageId(compressedPage.getDiskPageId()), compressedPage);
+    }
+
+    /**
+     * Set the extra compressed page offset and size
+     *
+     * @param compressedPage
+     * @param extraPageIndex
+     * @throws HyracksDataException
+     */
+    public void setExtraCompressedPageInfo(ICachedPageInternal compressedPage, int extraPageIndex)
+            throws HyracksDataException {
+        setCompressedPageInfo(compressedPage.getExtraBlockPageId() + extraPageIndex, compressedPage);
+    }
+
+    /* ************************
+     * LAF general methods
+     * ************************
+     */
+
+    /**
+     * Get the number of compressed pages
+     *
+     * @return
+     */
+    public int getNumberOfPages() {
+        return totalNumOfPages;
+    }
+
+    public int getFileId() {
+        return fileId;
+    }
+
+    public ICompressorDecompressor getCompressorDecompressor() {
+        return compressorDecompressor;
+    }
+
+    /* ************************
+     * Private methods
+     * ************************
+     */
+
+    private void ensureState(EnumSet<State> expectedStates) {
+        if (!expectedStates.contains(state)) {
+            throw new IllegalStateException(
+                    "Expecting the state to be " + expectedStates + ". Currently it is " + state);
+        }
+    }
+
+    private void changeToFunctionalState() throws HyracksDataException {
+        if (bufferCache.getNumPagesOfFile(fileId) == 0) {
+            state = State.WRITABLE;
+            lafWriter = new LAFWriter(this, bufferCache);
+        } else {
+            state = State.READABLE;
+            init();
+        }
+    }
+
+    private void init() throws HyracksDataException {
+        final int numOfPages = bufferCache.getNumPagesOfFile(fileId);
+        //Maximum number of entries in a page
+        final int numOfEntriesPerPage = bufferCache.getPageSize() / ENTRY_LENGTH;
+        //get the last page which may contain less entries than maxNumOfEntries
+        final long dpid = getDiskPageId(numOfPages - 1);
+        final ICachedPage page = bufferCache.pin(dpid, false);
+        try {
+            final ByteBuffer buf = page.getBuffer();
+
+            //Start at 1 since it is impossible to have EOF at the first entry of a page
+            int i = 1;
+            //Seek EOF and count number of entries
+            while (i < numOfEntriesPerPage && buf.getLong(i * ENTRY_LENGTH) != EOF) {
+                i++;
+            }
+
+            totalNumOfPages = (numOfPages - 1) * numOfEntriesPerPage + i;
+        } finally {
+            bufferCache.unpin(page);
+        }
+    }
+
+    private ICachedPage pinAndGetPage(int compressedPageId) throws HyracksDataException {
+        final int pageId = compressedPageId * ENTRY_LENGTH / bufferCache.getPageSize();
+        return bufferCache.pin(getDiskPageId(pageId), false);
+    }
+
+    private long getDiskPageId(int pageId) {
+        return BufferedFileHandle.getDiskPageId(fileId, pageId);
+    }
+
+    private void setCompressedPageInfo(int compressedPageId, ICachedPageInternal compressedPage)
+            throws HyracksDataException {
+        ensureState(READABLE);
+        if (totalNumOfPages == 0) {
+            /*
+             * It seems it is legal to pin empty file.
+             * Return the page information as it is not compressed.
+             */
+            compressedPage.setCompressedPageOffset(0);
+            compressedPage.setCompressedPageSize(bufferCache.getPageSize());
+            return;
+        }
+        final ICachedPage page = pinAndGetPage(compressedPageId);
+        try {
+            // No need for read latches as pages are immutable.
+            final ByteBuffer buf = page.getBuffer();
+            final int entryOffset = compressedPageId * ENTRY_LENGTH % bufferCache.getPageSize();
+            compressedPage.setCompressedPageOffset(buf.getLong(entryOffset));
+            compressedPage.setCompressedPageSize((int) buf.getLong(entryOffset + SIZE_ENTRY_OFFSET));
+        } finally {
+            bufferCache.unpin(page);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
new file mode 100644
index 0000000..3fb682c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression.file;
+
+import java.util.Objects;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IODeviceHandle;
+
+public class CompressedFileReference extends FileReference {
+    private static final long serialVersionUID = 1L;
+
+    private final String lafPath;
+    private final FileReference lafFileRef;
+    private final transient ICompressorDecompressor compressorDecompressor;
+
+    public CompressedFileReference(IODeviceHandle dev, ICompressorDecompressor compressorDecompressor, String path,
+            String lafPath) {
+        super(dev, path);
+        this.lafPath = lafPath;
+        lafFileRef = new FileReference(dev, lafPath);
+        this.compressorDecompressor = compressorDecompressor;
+    }
+
+    public FileReference getLAFFileReference() {
+        return lafFileRef;
+    }
+
+    public ICompressorDecompressor getCompressorDecompressor() {
+        return compressorDecompressor;
+    }
+
+    @Override
+    public boolean delete() {
+        return lafFileRef.delete() && super.delete();
+    }
+
+    @Override
+    public boolean isCompressed() {
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof CompressedFileReference)) {
+            return false;
+        }
+        return super.equals(o) && lafPath.equals(((CompressedFileReference) o).lafPath);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.getRelativePath(), lafPath);
+    }
+
+    /**
+     * @return the relative path for LAF file
+     */
+    public String getLAFRelativePath() {
+        return lafPath;
+    }
+
+    /**
+     * @return the absolute path for LAF file
+     */
+    public String getLAFAbsolutePath() {
+        return lafFileRef.getAbsolutePath();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java
new file mode 100644
index 0000000..86525b0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression.file;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+
+/**
+ * An interface that exposes the Look Aside File (LAF) writer to the indexes.
+ */
+public interface ICompressedPageWriter {
+    /**
+     * Before the index can write a compressed page, the index has to prepare the writer.
+     *
+     * @param cPage
+     * @throws HyracksDataException
+     */
+    public void prepareWrite(ICachedPage cPage) throws HyracksDataException;
+
+    /**
+     * Signal the writer to abort.
+     */
+    public void abort();
+
+    /**
+     * Finalize the writing of the compressed pages.
+     *
+     * @return
+     * @throws HyracksDataException
+     */
+    void endWriting() throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
new file mode 100644
index 0000000..dcccc52
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression.file;
+
+import static org.apache.hyracks.storage.common.compression.file.CompressedFileManager.ENTRY_LENGTH;
+import static org.apache.hyracks.storage.common.compression.file.CompressedFileManager.EOF;
+import static org.apache.hyracks.storage.common.compression.file.CompressedFileManager.SIZE_ENTRY_OFFSET;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPageInternal;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+/**
+ * Look Aside File writer
+ * This class is called by two threads simultaneously:
+ * - a thread to prepare the LAF page (bulk-loader)
+ * - and a writer thread to write the LAF page (buffer cache writer thread)
+ * Hence, it is not thread safe to have more than one thread to prepare or to write LAF pages.
+ */
+@NotThreadSafe
+class LAFWriter implements ICompressedPageWriter {
+    private final CompressedFileManager compressedFileManager;
+    private final IBufferCache bufferCache;
+    private final IFIFOPageQueue queue;
+    private final Map<Integer, LAFFrame> cachedFrames;
+    private final Queue<LAFFrame> recycledFrames;
+    private final int fileId;
+    private final int maxNumOfEntries;
+    private final PageWriteFailureCallback callBack;
+    private LAFFrame currentFrame;
+    private int currentPageId;
+    private int maxPageId;
+
+    private long lastOffset;
+    private int totalNumOfPages;
+
+    public LAFWriter(CompressedFileManager compressedFileManager, IBufferCache bufferCache) {
+        this.compressedFileManager = compressedFileManager;
+        this.bufferCache = bufferCache;
+        queue = bufferCache.createFIFOQueue();
+        cachedFrames = new HashMap<>();
+        recycledFrames = new ArrayDeque<>();
+        this.fileId = compressedFileManager.getFileId();
+        callBack = new PageWriteFailureCallback();
+
+        maxNumOfEntries = bufferCache.getPageSize() / ENTRY_LENGTH;
+        lastOffset = 0;
+        totalNumOfPages = 0;
+        maxPageId = -1;
+        currentPageId = -1;
+    }
+
+    /* ************************************
+     * ICompressedPageWriter methods
+     * Called by non-BufferCache thread (Bulk-loader)
+     * ************************************
+     */
+
+    @Override
+    public void prepareWrite(ICachedPage cPage) throws HyracksDataException {
+        final ICachedPageInternal internalPage = (ICachedPageInternal) cPage;
+        final int entryPageId = getLAFEntryPageId(BufferedFileHandle.getPageId(internalPage.getDiskPageId()));
+
+        synchronized (cachedFrames) {
+            if (!cachedFrames.containsKey(entryPageId)) {
+                try {
+                    //Writing new page(s). Confiscate the page(s) from the buffer cache.
+                    prepareFrames(entryPageId, internalPage);
+                } catch (HyracksDataException e) {
+                    abort();
+                    throw e;
+                }
+            }
+        }
+    }
+
+    private void prepareFrames(int entryPageId, ICachedPageInternal cPage) throws HyracksDataException {
+        //Confiscate the first page
+        confiscatePage(entryPageId);
+        //check if extra pages spans to the next entry page
+        for (int i = 0; i < cPage.getFrameSizeMultiplier() - 1; i++) {
+            final int extraEntryPageId = getLAFEntryPageId(cPage.getExtraBlockPageId() + i);
+            if (!cachedFrames.containsKey(extraEntryPageId)) {
+                confiscatePage(extraEntryPageId);
+            }
+        }
+    }
+
+    private void confiscatePage(int pageId) throws HyracksDataException {
+        //Writing new page. Confiscate the page from the buffer cache.
+        final ICachedPage newPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, pageId));
+        cachedFrames.put(pageId, getLAFFrame(newPage));
+        maxPageId = Math.max(maxPageId, pageId);
+    }
+
+    private LAFFrame getLAFFrame(ICachedPage cPage) {
+        LAFFrame lafFrame = recycledFrames.poll();
+        if (lafFrame == null) {
+            lafFrame = new LAFFrame();
+        }
+        lafFrame.setCachedPage(cPage);
+        return lafFrame;
+    }
+
+    @Override
+    public void endWriting() throws HyracksDataException {
+        if (callBack.hasFailed()) {
+            //if write failed, return confiscated pages
+            abort();
+            throw HyracksDataException.create(callBack.getFailure());
+        }
+        synchronized (cachedFrames) {
+            final LAFFrame lastPage = cachedFrames.get(maxPageId);
+            if (lastPage != null && !lastPage.isFull()) {
+                /*
+                 * The last page may or may not be filled. In case it is not filled (i.e do not have
+                 * the max number of entries). Then, write an indicator after the last entry.
+                 * If it has been written (i.e lastPage = null), that means it has been filled.
+                 */
+                lastPage.setEOF();
+            }
+            for (Entry<Integer, LAFFrame> entry : cachedFrames.entrySet()) {
+                queue.put(entry.getValue().cPage, callBack);
+            }
+            bufferCache.finishQueue();
+
+            //Signal the compressedFileManager to change its state
+            compressedFileManager.endWriting(totalNumOfPages);
+        }
+    }
+
+    @Override
+    public void abort() {
+        synchronized (cachedFrames) {
+            for (Entry<Integer, LAFFrame> frame : cachedFrames.entrySet()) {
+                bufferCache.returnPage(frame.getValue().cPage);
+            }
+        }
+    }
+
+    /* ************************************
+     * Local methods:
+     * Called by BufferCache writer thread
+     * ************************************
+     */
+
+    public long writePageInfo(int pageId, long size) throws HyracksDataException {
+        final LAFFrame frame = getPageBuffer(pageId);
+
+        final long pageOffset = lastOffset;
+        frame.writePageInfo(pageId, pageOffset, size);
+        lastOffset += size;
+        totalNumOfPages++;
+
+        writeFullPage();
+        return pageOffset;
+    }
+
+    private LAFFrame getPageBuffer(int compressedPageId) {
+        final int pageId = getLAFEntryPageId(compressedPageId);
+
+        if (currentPageId == pageId) {
+            return currentFrame;
+        }
+
+        final LAFFrame frame;
+        synchronized (cachedFrames) {
+            //Check if the frame is cached
+            frame = cachedFrames.get(pageId);
+            if (frame == null) {
+                //Trying to write unprepared page
+                abort();
+                throw new IllegalStateException("Unprepared compressed-write for page ID: " + pageId);
+            }
+        }
+
+        currentFrame = frame;
+        currentPageId = pageId;
+        return frame;
+    }
+
+    private void writeFullPage() throws HyracksDataException {
+        if (currentFrame.isFull()) {
+            //The LAF page is filled. We do not need to keep it.
+            //Write it to the file and remove it from the cachedFrames map
+            queue.put(currentFrame.cPage, callBack);
+            synchronized (cachedFrames) {
+                //Recycle the frame
+                final LAFFrame frame = cachedFrames.remove(currentPageId);
+                frame.setCachedPage(null);
+                recycledFrames.add(frame);
+            }
+            currentFrame = null;
+            currentPageId = -1;
+        }
+    }
+
+    private int getLAFEntryPageId(int compressedPageId) {
+        return compressedPageId * ENTRY_LENGTH / bufferCache.getPageSize();
+    }
+
+    private class LAFFrame {
+        private ICachedPage cPage;
+        private int numOfEntries;
+        private int maxEntryOffset;
+
+        public void setCachedPage(ICachedPage cPage) {
+            this.cPage = cPage;
+            numOfEntries = 0;
+            maxEntryOffset = -1;
+        }
+
+        public void writePageInfo(int compressedPageId, long offset, long size) {
+            final int entryOffset = compressedPageId * ENTRY_LENGTH % bufferCache.getPageSize();
+            //Put page offset
+            cPage.getBuffer().putLong(entryOffset, offset);
+            //Put page size
+            cPage.getBuffer().putLong(entryOffset + SIZE_ENTRY_OFFSET, size);
+            //Keep the max entry offset to set EOF (if needed)
+            maxEntryOffset = Math.max(maxEntryOffset, entryOffset);
+            numOfEntries++;
+        }
+
+        public void setEOF() {
+            cPage.getBuffer().putLong(maxEntryOffset + ENTRY_LENGTH, EOF);
+        }
+
+        public boolean isFull() {
+            return numOfEntries == maxNumOfEntries;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java
new file mode 100644
index 0000000..8f957a9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression.file;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+
+public class NoOpLAFWriter implements ICompressedPageWriter {
+    public static final NoOpLAFWriter INSTACNE = new NoOpLAFWriter();
+
+    private NoOpLAFWriter() {
+    }
+
+    @Override
+    public void prepareWrite(ICachedPage cPage) throws HyracksDataException {
+        //NoOp
+    }
+
+    @Override
+    public void abort() {
+        //NoOp
+    }
+
+    @Override
+    public void endWriting() throws HyracksDataException {
+        //NoOp
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
index 4f15588..11862dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
@@ -18,18 +18,32 @@
  */
 package org.apache.hyracks.storage.common.file;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.hyracks.storage.common.buffercache.BufferCache.DEBUG;
 
-import org.apache.hyracks.api.io.IFileHandle;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
-public class BufferedFileHandle {
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.common.buffercache.AbstractBufferedFileIOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
+import org.apache.hyracks.storage.common.compression.file.NoOpLAFWriter;
+
+public class BufferedFileHandle extends AbstractBufferedFileIOManager {
     private final int fileId;
-    private volatile IFileHandle handle;
     private final AtomicInteger refCount;
 
-    public BufferedFileHandle(int fileId, IFileHandle handle) {
+    protected BufferedFileHandle(int fileId, BufferCache bufferCache, IIOManager ioManager,
+            BlockingQueue<BufferCacheHeaderHelper> headerPageCache, IPageReplacementStrategy pageReplacementStrategy) {
+        super(bufferCache, ioManager, headerPageCache, pageReplacementStrategy);
         this.fileId = fileId;
-        this.handle = handle;
         refCount = new AtomicInteger();
     }
 
@@ -37,22 +51,6 @@ public class BufferedFileHandle {
         return fileId;
     }
 
-    public void setFileHandle(IFileHandle fileHandle) {
-        this.handle = fileHandle;
-    }
-
-    public IFileHandle getFileHandle() {
-        return handle;
-    }
-
-    public void markAsDeleted() {
-        handle = null;
-    }
-
-    public boolean fileHasBeenDeleted() {
-        return handle == null;
-    }
-
     public int incReferenceCount() {
         return refCount.incrementAndGet();
     }
@@ -69,6 +67,86 @@ public class BufferedFileHandle {
         return getDiskPageId(fileId, pageId);
     }
 
+    @Override
+    public void read(CachedPage cPage) throws HyracksDataException {
+        final BufferCacheHeaderHelper header = checkoutHeaderHelper();
+        try {
+            long bytesRead =
+                    readToBuffer(header.prepareRead(bufferCache.getPageSizeWithHeader()), getFirstPageOffset(cPage));
+
+            if (!verifyBytesRead(bufferCache.getPageSizeWithHeader(), bytesRead)) {
+                return;
+            }
+
+            final ByteBuffer buf = header.processHeader(cPage);
+            cPage.getBuffer().put(buf);
+        } finally {
+            returnHeaderHelper(header);
+        }
+
+        readExtraPages(cPage);
+    }
+
+    private void readExtraPages(CachedPage cPage) throws HyracksDataException {
+        final int totalPages = cPage.getFrameSizeMultiplier();
+        if (totalPages > 1) {
+            pageReplacementStrategy.fixupCapacityOnLargeRead(cPage);
+            cPage.getBuffer().position(bufferCache.getPageSize());
+            cPage.getBuffer().limit(totalPages * bufferCache.getPageSize());
+            readToBuffer(cPage.getBuffer(), getExtraPageOffset(cPage));
+        }
+    }
+
+    @Override
+    protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId)
+            throws HyracksDataException {
+        final ByteBuffer buf = cPage.getBuffer();
+        final boolean contiguousLargePages = getPageId(cPage.getDiskPageId()) + 1 == extraBlockPageId;
+        long bytesWritten;
+        try {
+            buf.limit(contiguousLargePages ? bufferCache.getPageSize() * totalPages : bufferCache.getPageSize());
+            buf.position(0);
+            bytesWritten = writeToFile(header.prepareWrite(cPage), getFirstPageOffset(cPage));
+        } finally {
+            returnHeaderHelper(header);
+        }
+
+        if (totalPages > 1 && !contiguousLargePages) {
+            buf.limit(totalPages * bufferCache.getPageSize());
+            bytesWritten += writeToFile(buf, getExtraPageOffset(cPage));
+        }
+
+        final int expectedWritten = bufferCache.getPageSizeWithHeader() + bufferCache.getPageSize() * (totalPages - 1);
+        verifyBytesWritten(expectedWritten, bytesWritten);
+    }
+
+    @Override
+    public int getNumberOfPages() {
+        if (DEBUG) {
+            assert getFileSize() % bufferCache.getPageSizeWithHeader() == 0;
+        }
+        return (int) (getFileSize() / bufferCache.getPageSizeWithHeader());
+    }
+
+    @Override
+    public ICompressedPageWriter getCompressedPageWriter() {
+        return NoOpLAFWriter.INSTACNE;
+    }
+
+    @Override
+    protected long getFirstPageOffset(CachedPage cPage) {
+        return getPageOffset(getPageId(cPage.getDiskPageId()));
+    }
+
+    @Override
+    protected long getExtraPageOffset(CachedPage cPage) {
+        return getPageOffset(cPage.getExtraBlockPageId());
+    }
+
+    private long getPageOffset(long pageId) {
+        return pageId * bufferCache.getPageSizeWithHeader();
+    }
+
     public static long getDiskPageId(int fileId, int pageId) {
         return (((long) fileId) << 32) + pageId;
     }
@@ -80,4 +158,15 @@ public class BufferedFileHandle {
     public static int getPageId(long dpid) {
         return (int) dpid;
     }
+
+    public static BufferedFileHandle create(FileReference fileRef, int fileId, BufferCache bufferCache,
+            IIOManager ioManager, BlockingQueue<BufferCacheHeaderHelper> headerPageCache,
+            IPageReplacementStrategy pageReplacementStrategy) {
+        if (fileRef.isCompressed()) {
+            final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
+            return new CompressedBufferedFileHandle(fileId, cFileRef.getLAFFileReference(), bufferCache, ioManager,
+                    headerPageCache, pageReplacementStrategy);
+        }
+        return new BufferedFileHandle(fileId, bufferCache, ioManager, headerPageCache, pageReplacementStrategy);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
new file mode 100644
index 0000000..235e144
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.file;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileManager;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
+
+public class CompressedBufferedFileHandle extends BufferedFileHandle {
+    private final FileReference lafFileRef;
+    private volatile CompressedFileManager compressedFileManager;
+
+    protected CompressedBufferedFileHandle(int fileId, FileReference lafFileRef, BufferCache bufferCache,
+            IIOManager ioManager, BlockingQueue<BufferCacheHeaderHelper> headerPageCache,
+            IPageReplacementStrategy pageReplacementStrategy) {
+        super(fileId, bufferCache, ioManager, headerPageCache, pageReplacementStrategy);
+        this.lafFileRef = lafFileRef;
+    }
+
+    @Override
+    public void read(CachedPage cPage) throws HyracksDataException {
+        final BufferCacheHeaderHelper header = checkoutHeaderHelper();
+        try {
+            compressedFileManager.setCompressedPageInfo(cPage);
+            long bytesRead = readToBuffer(header.prepareRead(cPage.getCompressedPageSize()), getFirstPageOffset(cPage));
+
+            if (!verifyBytesRead(cPage.getCompressedPageSize(), bytesRead)) {
+                return;
+            }
+            final ByteBuffer cBuffer = header.processHeader(cPage);
+            final ByteBuffer uBuffer = cPage.getBuffer();
+            fixBufferPointers(uBuffer, 0);
+            if (cPage.getCompressedPageSize() < bufferCache.getPageSizeWithHeader()) {
+                uncompressToPageBuffer(cBuffer, uBuffer);
+            } else {
+                cPage.getBuffer().put(cBuffer);
+            }
+
+            final int totalPages = cPage.getFrameSizeMultiplier();
+            if (totalPages > 1) {
+                pageReplacementStrategy.fixupCapacityOnLargeRead(cPage);
+                readExtraPages(cPage, cBuffer);
+            }
+        } finally {
+            returnHeaderHelper(header);
+        }
+    }
+
+    private void readExtraPages(CachedPage cPage, ByteBuffer cBuffer) throws HyracksDataException {
+        final ByteBuffer uBuffer = cPage.getBuffer();
+
+        final int totalPages = cPage.getFrameSizeMultiplier();
+        for (int i = 1; i < totalPages; i++) {
+            fixBufferPointers(uBuffer, i);
+            compressedFileManager.setExtraCompressedPageInfo(cPage, i - 1);
+            if (cPage.getCompressedPageSize() < bufferCache.getPageSize()) {
+                cBuffer.position(0);
+                cBuffer.limit(cPage.getCompressedPageSize());
+                readToBuffer(cBuffer, getExtraPageOffset(cPage));
+                cBuffer.flip();
+                uncompressToPageBuffer(cBuffer, cPage.getBuffer());
+            } else {
+                readToBuffer(uBuffer, getExtraPageOffset(cPage));
+            }
+        }
+    }
+
+    @Override
+    protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId)
+            throws HyracksDataException {
+        try {
+            final ByteBuffer cBuffer = header.prepareWrite(cPage, getRequiredBufferSize());
+            final ByteBuffer uBuffer = cPage.getBuffer();
+            final long pageId = cPage.getDiskPageId();
+
+            final long bytesWritten;
+            final long expectedBytesWritten;
+
+            fixBufferPointers(uBuffer, 0);
+            if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) {
+                cBuffer.position(0);
+                final long offset = compressedFileManager.writePageInfo(pageId, cBuffer.remaining());
+                expectedBytesWritten = cBuffer.limit();
+                bytesWritten = writeToFile(cBuffer, offset);
+            } else {
+                //Compression did not gain any savings
+                final ByteBuffer[] buffers = header.prepareWrite(cPage);
+                final long offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader());
+                expectedBytesWritten = buffers[0].limit() + (long) buffers[1].limit();
+                bytesWritten = writeToFile(buffers, offset);
+            }
+
+            verifyBytesWritten(expectedBytesWritten, bytesWritten);
+
+            //Write extra pages
+            if (totalPages > 1) {
+                writeExtraCompressedPages(cPage, cBuffer, totalPages, extraBlockPageId);
+            }
+
+        } finally {
+            returnHeaderHelper(header);
+        }
+    }
+
+    private void writeExtraCompressedPages(CachedPage cPage, ByteBuffer cBuffer, int totalPages, int extraBlockPageId)
+            throws HyracksDataException {
+        final ByteBuffer uBuffer = cPage.getBuffer();
+        long expectedBytesWritten = 0;
+        long bytesWritten = 0;
+        for (int i = 1; i < totalPages; i++) {
+            fixBufferPointers(uBuffer, i);
+            cBuffer.position(0);
+
+            final ByteBuffer writeBuffer;
+            if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) {
+                writeBuffer = cBuffer;
+            } else {
+                writeBuffer = uBuffer;
+            }
+            final int length = writeBuffer.remaining();
+            final long offset = compressedFileManager.writeExtraPageInfo(extraBlockPageId, length, i - 1);
+            expectedBytesWritten += length;
+            bytesWritten += writeToFile(writeBuffer, offset);
+        }
+
+        verifyBytesWritten(expectedBytesWritten, bytesWritten);
+
+    }
+
+    @Override
+    public void open(FileReference fileRef) throws HyracksDataException {
+        final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
+        final int lafFileId = bufferCache.openFile(cFileRef.getLAFFileReference());
+
+        compressedFileManager = new CompressedFileManager(bufferCache, lafFileId, cFileRef);
+        compressedFileManager.open();
+        super.open(fileRef);
+    }
+
+    /**
+     * Decrement the reference counter for LAF file.
+     * It is up to {@link BufferCache} to physically close the file.
+     * see {@link BufferCache#deleteFile(FileReference)} and {@link BufferCache#purgeHandle(int)}
+     */
+    @Override
+    public void close() throws HyracksDataException {
+        if (hasBeenOpened()) {
+            compressedFileManager.close();
+            bufferCache.closeFile(compressedFileManager.getFileId());
+        }
+        super.close();
+    }
+
+    @Override
+    public void purge() throws HyracksDataException {
+        super.purge();
+        compressedFileManager.close();
+        bufferCache.closeFile(compressedFileManager.getFileId());
+        bufferCache.purgeHandle(compressedFileManager.getFileId());
+    }
+
+    @Override
+    public void markAsDeleted() throws HyracksDataException {
+        if (hasBeenOpened()) {
+            bufferCache.deleteFile(compressedFileManager.getFileId());
+            compressedFileManager = null;
+        } else {
+            bufferCache.deleteFile(lafFileRef);
+        }
+        super.markAsDeleted();
+    }
+
+    @Override
+    public void force(boolean metadata) throws HyracksDataException {
+        super.force(metadata);
+        bufferCache.force(compressedFileManager.getFileId(), metadata);
+    }
+
+    @Override
+    public int getNumberOfPages() {
+        return compressedFileManager.getNumberOfPages();
+    }
+
+    @Override
+    protected long getFirstPageOffset(CachedPage cPage) {
+        return cPage.getCompressedPageOffset();
+    }
+
+    @Override
+    protected long getExtraPageOffset(CachedPage cPage) {
+        return getFirstPageOffset(cPage);
+    }
+
+    @Override
+    public ICompressedPageWriter getCompressedPageWriter() {
+        return compressedFileManager.getCompressedPageWriter();
+    }
+
+    /* ********************************
+     * Compression methods
+     * ********************************
+     */
+
+    private void fixBufferPointers(ByteBuffer uBuffer, int i) {
+        //Fix the uncompressed buffer to point at the i^th extra page
+        uBuffer.position(bufferCache.getPageSize() * i);
+        //Similarly, fix the limit to a page-worth of data from the i^th page
+        uBuffer.limit(uBuffer.position() + bufferCache.getPageSize());
+    }
+
+    private void uncompressToPageBuffer(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
+        final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor();
+        compDecomp.uncompress(cBuffer, uBuffer);
+        verifyUncompressionSize(bufferCache.getPageSize(), uBuffer.remaining());
+    }
+
+    private int compressToWriteBuffer(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
+        final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor();
+        compDecomp.compress(uBuffer, cBuffer);
+        return cBuffer.remaining();
+    }
+
+    private int getRequiredBufferSize() {
+        final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor();
+        return compDecomp.computeCompressedBufferSize(bufferCache.getPageSize());
+    }
+
+    private void verifyUncompressionSize(int expected, int actual) {
+        if (expected != actual) {
+            throwException("Uncompressed", expected, actual);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
index aee7e90..b8727b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
 import org.apache.hyracks.storage.am.lsm.btree.utils.LSMBTreeUtil;
 import org.apache.hyracks.storage.common.IIndexAccessor;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 import org.apache.hyracks.util.trace.ITracer;
 import org.junit.After;
 import org.junit.Before;
@@ -62,7 +63,8 @@ public class LSMBTreeExamplesTest extends OrderedIndexExamplesTest {
                 bloomFilterKeyFields, harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 harness.getOperationTracker(), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true,
                 filterTypeTraits, filterCmpFactories, btreeFields, filterFields, true,
-                harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
+                harness.getMetadataPageManagerFactory(), false, ITracer.NONE,
+                NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Before

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
index 4fafb38..d83f475 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
@@ -31,6 +31,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerFactory;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 import org.apache.hyracks.util.trace.ITracer;
 import org.junit.Test;
 
@@ -52,7 +53,8 @@ public class LSMBTreeModificationOperationCallbackTest extends AbstractModificat
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(),
                 harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
-                harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
+                harness.getMetadataPageManagerFactory(), false, ITracer.NONE,
+                NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index 3dfb369..59c9ebb 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -40,6 +40,7 @@ import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 import org.apache.hyracks.util.trace.ITracer;
 import org.junit.Assert;
 import org.junit.Test;
@@ -61,7 +62,8 @@ public class LSMBTreeSearchOperationCallbackTest extends AbstractSearchOperation
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(),
                 harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
-                harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
+                harness.getMetadataPageManagerFactory(), false, ITracer.NONE,
+                NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
index 0914541..c14474b 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerFactory;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 import org.apache.hyracks.util.trace.ITracer;
 import org.junit.After;
 import org.junit.Assert;
@@ -74,7 +75,8 @@ public class LSMBTreeUpdateInPlaceTest extends AbstractOperationCallbackTest {
                 harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(),
                 harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
-                harness.getMetadataPageManagerFactory(), true, ITracer.NONE);
+                harness.getMetadataPageManagerFactory(), true, ITracer.NONE,
+                NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
index b25a229..baa95e9 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -57,12 +58,13 @@ public class TestLsmBtreeLocalResource extends LSMBTreeLocalResource {
         super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, path,
                 storageManager, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories,
                 btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory,
-                vbcProvider, ioSchedulerProvider, durable);
+                vbcProvider, ioSchedulerProvider, durable, NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     protected TestLsmBtreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
             double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) throws HyracksDataException {
-        super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields);
+        super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
+                NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
index 6b13f56..3d7c520 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
@@ -32,6 +32,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
 import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 
 public class TestLsmBtreeLocalResourceFactory extends LSMBTreeLocalResourceFactory {
     private static final long serialVersionUID = 1L;
@@ -47,7 +48,7 @@ public class TestLsmBtreeLocalResourceFactory extends LSMBTreeLocalResourceFacto
         super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
                 opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
                 mergePolicyFactory, mergePolicyProperties, durable, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
-                isPrimary, btreeFields);
+                isPrimary, btreeFields, NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
index 6950f86..85038c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
@@ -32,6 +32,7 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.common.datagen.ProbabilityHelper;
 import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
 import org.apache.hyracks.storage.am.lsm.btree.utils.LSMBTreeUtil;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 import org.apache.hyracks.util.trace.ITracer;
 
 public class LSMBTreeMultiThreadTest extends OrderedIndexMultiThreadTest {
@@ -57,7 +58,8 @@ public class LSMBTreeMultiThreadTest extends OrderedIndexMultiThreadTest {
                 harness.getFileReference(), harness.getDiskBufferCache(), typeTraits, cmpFactories,
                 bloomFilterKeyFields, harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
                 harness.getOperationTracker(), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true,
-                null, null, null, null, true, harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
+                null, null, null, null, true, harness.getMetadataPageManagerFactory(), false, ITracer.NONE,
+                NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 2462c85..3bb1fa4 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -36,9 +36,9 @@ import org.apache.hyracks.storage.am.common.datagen.TupleBatch;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import org.apache.hyracks.storage.am.lsm.btree.utils.LSMBTreeUtil;
+import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
-import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
@@ -48,6 +48,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
 import org.apache.hyracks.test.support.TestUtils;
 import org.apache.hyracks.util.ExitUtil;
@@ -125,7 +126,7 @@ public class LSMTreeRunner implements IExperimentRunner {
                 cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, new NoMergePolicy(),
                 new ThreadCountingTracker(), ioScheduler, NoOpIOOperationCallbackFactory.INSTANCE, true, null, null,
                 null, null, true, TestStorageManagerComponentHolder.getMetadataPageManagerFactory(), false,
-                ITracer.NONE);
+                ITracer.NONE, NoOpCompressorDecompressorFactory.INSTANCE);
     }
 
     @Override