You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/11/07 07:15:47 UTC
[02/10] asterixdb git commit: [ASTERIXDB-2422][STO] Introduce
compressed storage
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
new file mode 100644
index 0000000..a913513
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import static org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+
+public class BufferCacheHeaderHelper {
+ private static final int FRAME_MULTIPLIER_OFF = 0;
+ private static final int EXTRA_BLOCK_PAGE_ID_OFF = FRAME_MULTIPLIER_OFF + 4; // 4
+
+ private final ByteBuffer[] array;
+ private final int pageSizeWithHeader;
+ private ByteBuffer buf;
+
+ public BufferCacheHeaderHelper(int pageSize) {
+ this.pageSizeWithHeader = RESERVED_HEADER_BYTES + pageSize;
+ buf = ByteBuffer.allocate(pageSizeWithHeader);
+ array = new ByteBuffer[] { buf, null };
+ }
+
+ public ByteBuffer[] prepareWrite(CachedPage cPage) {
+ setPageInfo(cPage);
+ buf.position(0);
+ buf.limit(RESERVED_HEADER_BYTES);
+ array[1] = cPage.buffer;
+ return array;
+ }
+
+ public ByteBuffer prepareWrite(CachedPage cPage, int requiredSize) {
+ ensureBufferCapacity(requiredSize);
+ setPageInfo(cPage);
+ buf.position(RESERVED_HEADER_BYTES);
+ buf.limit(buf.capacity());
+ return buf;
+ }
+
+ public ByteBuffer prepareRead(int size) {
+ buf.position(0);
+ buf.limit(size);
+ return buf;
+ }
+
+ public ByteBuffer processHeader(CachedPage cPage) {
+ cPage.setFrameSizeMultiplier(buf.getInt(FRAME_MULTIPLIER_OFF));
+ cPage.setExtraBlockPageId(buf.getInt(EXTRA_BLOCK_PAGE_ID_OFF));
+ buf.position(RESERVED_HEADER_BYTES);
+ return buf;
+ }
+
+ private void setPageInfo(CachedPage cPage) {
+ buf.putInt(FRAME_MULTIPLIER_OFF, cPage.getFrameSizeMultiplier());
+ buf.putInt(EXTRA_BLOCK_PAGE_ID_OFF, cPage.getExtraBlockPageId());
+ }
+
+ /**
+ * {@link ICompressorDecompressor#compress(byte[], int, int, byte[], int)} may require additional
+ * space to do the compression. see {@link ICompressorDecompressor#computeCompressedBufferSize(int)}.
+ *
+ * @param compressor
+ * @param size
+ */
+ private void ensureBufferCapacity(int size) {
+ final int requiredSize = size + RESERVED_HEADER_BYTES;
+ if (buf.capacity() < requiredSize) {
+ buf = ByteBuffer.allocate(requiredSize);
+ array[0] = buf;
+ }
+ buf.limit(buf.capacity());
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 6ec12aa..cf0553c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -45,6 +45,8 @@ public class CachedPage implements ICachedPageInternal {
private IQueueInfo queueInfo;
private int multiplier;
private int extraBlockPageId;
+ private long compressedOffset;
+ private int compressedSize;
// DEBUG
private static final boolean DEBUG = false;
private final StackTraceElement[] ctorStack;
@@ -224,4 +226,23 @@ public class CachedPage implements ICachedPageInternal {
LOGGER.error("An IO Failure took place but the failure callback is not set", e);
}
}
+
+ public void setCompressedPageOffset(long offset) {
+ this.compressedOffset = offset;
+ }
+
+ @Override
+ public long getCompressedPageOffset() {
+ return compressedOffset;
+ }
+
+ @Override
+ public void setCompressedPageSize(int size) {
+ this.compressedSize = size;
+ }
+
+ @Override
+ public int getCompressedPageSize() {
+ return compressedSize;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
index 21d3677..c762dd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.common.buffercache;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
public interface IBufferCache {
@@ -278,4 +279,11 @@ public interface IBufferCache {
*/
void closeFileIfOpen(FileReference fileRef);
+ /**
+ * @return compressed page writer
+ */
+ default ICompressedPageWriter getCompressedPageWriter(int fileId) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does not support compressed pages");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
index d900852..04e93db 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
@@ -35,4 +35,12 @@ public interface ICachedPageInternal extends ICachedPage {
int getExtraBlockPageId();
void setExtraBlockPageId(int extraBlockPageId);
+
+ void setCompressedPageOffset(long offset);
+
+ long getCompressedPageOffset();
+
+ void setCompressedPageSize(int size);
+
+ int getCompressedPageSize();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
new file mode 100644
index 0000000..c4855bd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NoOpCompressorDecompressor implements ICompressorDecompressor {
+ public static final NoOpCompressorDecompressor INSTANCE = new NoOpCompressorDecompressor();
+
+ private NoOpCompressorDecompressor() {
+ }
+
+ @Override
+ public int computeCompressedBufferSize(int uBufferSize) {
+ return 0;
+ }
+
+ @Override
+ public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
+ return uBuffer;
+ }
+
+ @Override
+ public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
+ return cBuffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
new file mode 100644
index 0000000..690f4a2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class NoOpCompressorDecompressorFactory implements ICompressorDecompressorFactory {
+ private static final long serialVersionUID = 1L;
+ public static final ICompressorDecompressorFactory INSTANCE = new NoOpCompressorDecompressorFactory();
+
+ @Override
+ public ICompressorDecompressor createInstance() {
+ return NoOpCompressorDecompressor.INSTANCE;
+ }
+
+ @Override
+ public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
+ return registry.getClassIdentifier(getClass(), serialVersionUID);
+ }
+
+ @SuppressWarnings("squid:S1172") // unused parameter
+ public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
new file mode 100644
index 0000000..16c9a2d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.xerial.snappy.Snappy;
+
+/**
+ * Built-in Snappy compressor/decompressor wrapper
+ */
+public class SnappyCompressorDecompressor implements ICompressorDecompressor {
+ protected static final SnappyCompressorDecompressor INSTANCE = new SnappyCompressorDecompressor();
+
+ private SnappyCompressorDecompressor() {
+
+ }
+
+ @Override
+ public int computeCompressedBufferSize(int uncompressedBufferSize) {
+ return Snappy.maxCompressedLength(uncompressedBufferSize);
+ }
+
+ @Override
+ public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
+ try {
+ final int cLength = Snappy.compress(uBuffer.array(), uBuffer.position(), uBuffer.remaining(),
+ cBuffer.array(), cBuffer.position());
+ cBuffer.limit(cBuffer.position() + cLength);
+ return cBuffer;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
+ try {
+ final int uLength = Snappy.uncompress(cBuffer.array(), cBuffer.position(), cBuffer.remaining(),
+ uBuffer.array(), uBuffer.position());
+ uBuffer.limit(uBuffer.position() + uLength);
+ return uBuffer;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java
new file mode 100644
index 0000000..93f31bd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class SnappyCompressorDecompressorFactory implements ICompressorDecompressorFactory {
+ private static final long serialVersionUID = 1L;
+ private static final ICompressorDecompressorFactory INSTANCE = new SnappyCompressorDecompressorFactory();
+
+ @Override
+ public ICompressorDecompressor createInstance() {
+ return SnappyCompressorDecompressor.INSTANCE;
+ }
+
+ @Override
+ public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
+ return registry.getClassIdentifier(getClass(), serialVersionUID);
+ }
+
+ @SuppressWarnings("squid:S1172") // unused parameter
+ public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
+ return INSTANCE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
new file mode 100644
index 0000000..70d1388
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression.file;
+
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPageInternal;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+/**
+ * CompressedFileManager is responsible to manage the Look Aside File (LAF file), which contains
+ * the compression information. LAF file format is as follow:
+ *
+ * [<offset0, size0>, <offset1, size1> .... <offsetN, sizeN>]
+ * Each entry <offsetM, sizeM> is an entry of 16-bytes for page M (8 bytes for offset and 8 for size).
+ *
+ * The file is responsible to store the beginning and the size of each page after compression.
+ */
+public class CompressedFileManager {
+ protected static final int SIZE_ENTRY_OFFSET = 8; // 0 is for the compressed page offset
+ protected static final int ENTRY_LENGTH = 16; //<offset(8-bytes),size(8-bytes)>
+ protected static final int EOF = -1;
+ private static final EnumSet<State> CLOSED = EnumSet.of(State.CLOSED);
+ private static final EnumSet<State> READABLE_WRITABLE = EnumSet.of(State.READABLE, State.WRITABLE);
+ private static final EnumSet<State> READABLE = EnumSet.of(State.READABLE);
+ private static final EnumSet<State> WRITABLE = EnumSet.of(State.WRITABLE);
+
+ private enum State {
+ READABLE,
+ WRITABLE,
+ CLOSED
+ }
+
+ private final IBufferCache bufferCache;
+ private final int fileId;
+ private final ICompressorDecompressor compressorDecompressor;
+
+ private State state;
+ private int totalNumOfPages;
+
+ private LAFWriter lafWriter;
+
+ public CompressedFileManager(IBufferCache bufferCache, int fileId, CompressedFileReference fileRef) {
+ state = State.CLOSED;
+ totalNumOfPages = 0;
+ this.bufferCache = bufferCache;
+ this.fileId = fileId;
+ this.compressorDecompressor = fileRef.getCompressorDecompressor();
+ }
+
+ /**
+ * If the file is empty (i.e. the number of pages is zero)
+ * Then the state will be WRITABLE.
+ *
+ * @throws HyracksDataException
+ */
+ public void open() throws HyracksDataException {
+ ensureState(CLOSED);
+ changeToFunctionalState();
+ }
+
+ /**
+ * Close the LAF file.
+ *
+ * @throws HyracksDataException
+ */
+ public void close() {
+ ensureState(READABLE_WRITABLE);
+ state = State.CLOSED;
+ }
+
+ /* ************************
+ * LAF writing methods
+ * ************************
+ */
+
+ public ICompressedPageWriter getCompressedPageWriter() {
+ ensureState(WRITABLE);
+ return lafWriter;
+ }
+
+ /**
+ * Add page information (offset, size) after compression.
+ *
+ * @param dpid
+ * @param size
+ * @return offset for the compressed page.
+ * @throws HyracksDataException
+ */
+ public long writePageInfo(long dpid, long size) throws HyracksDataException {
+ final int pageId = BufferedFileHandle.getPageId(dpid);
+ //Write the page (extraPageIndex = 0)
+ return writeExtraPageInfo(pageId, size, 0);
+ }
+
+ /**
+ * Add extra page information (offset, size) after compression.
+ *
+ * @param extraPageId
+ * extra page ID
+ * @param size
+ * size of the extra page
+ * @param extraPageIndex
+ * the index of the extra page (starting from 0)
+ * @return offset for the compressed page.
+ * @throws HyracksDataException
+ */
+ public long writeExtraPageInfo(int extraPageId, long size, int extraPageIndex) throws HyracksDataException {
+ ensureState(WRITABLE);
+
+ final long compressedPageOffset;
+ try {
+ compressedPageOffset = lafWriter.writePageInfo(extraPageId + extraPageIndex, size);
+ } catch (HyracksDataException e) {
+ lafWriter.abort();
+ throw e;
+ }
+
+ return compressedPageOffset;
+ }
+
+ /**
+ * This methods is used by {@link LAFWriter#endWriting()} to signal the end of writing.
+ * After calling this methods, LAF file will be READ-ONLY.
+ *
+ * @param totalNumOfPages
+ * The total number of pages of the index
+ * @throws HyracksDataException
+ */
+ void endWriting(int totalNumOfPages) {
+ ensureState(WRITABLE);
+ this.totalNumOfPages = totalNumOfPages;
+ lafWriter = null;
+ state = State.READABLE;
+ }
+
+ /* ************************
+ * LAF reading methods
+ * ************************
+ */
+
+ /**
+ * Set the compressed page offset and size
+ *
+ * @param compressedPage
+ * @throws HyracksDataException
+ */
+ public void setCompressedPageInfo(ICachedPageInternal compressedPage) throws HyracksDataException {
+ setCompressedPageInfo(BufferedFileHandle.getPageId(compressedPage.getDiskPageId()), compressedPage);
+ }
+
+ /**
+ * Set the extra compressed page offset and size
+ *
+ * @param compressedPage
+ * @param extraPageIndex
+ * @throws HyracksDataException
+ */
+ public void setExtraCompressedPageInfo(ICachedPageInternal compressedPage, int extraPageIndex)
+ throws HyracksDataException {
+ setCompressedPageInfo(compressedPage.getExtraBlockPageId() + extraPageIndex, compressedPage);
+ }
+
+ /* ************************
+ * LAF general methods
+ * ************************
+ */
+
+ /**
+ * Get the number of compressed pages
+ *
+ * @return
+ */
+ public int getNumberOfPages() {
+ return totalNumOfPages;
+ }
+
+ public int getFileId() {
+ return fileId;
+ }
+
+ public ICompressorDecompressor getCompressorDecompressor() {
+ return compressorDecompressor;
+ }
+
+ /* ************************
+ * Private methods
+ * ************************
+ */
+
+ private void ensureState(EnumSet<State> expectedStates) {
+ if (!expectedStates.contains(state)) {
+ throw new IllegalStateException(
+ "Expecting the state to be " + expectedStates + ". Currently it is " + state);
+ }
+ }
+
+ private void changeToFunctionalState() throws HyracksDataException {
+ if (bufferCache.getNumPagesOfFile(fileId) == 0) {
+ state = State.WRITABLE;
+ lafWriter = new LAFWriter(this, bufferCache);
+ } else {
+ state = State.READABLE;
+ init();
+ }
+ }
+
+ private void init() throws HyracksDataException {
+ final int numOfPages = bufferCache.getNumPagesOfFile(fileId);
+ //Maximum number of entries in a page
+ final int numOfEntriesPerPage = bufferCache.getPageSize() / ENTRY_LENGTH;
+ //get the last page which may contain less entries than maxNumOfEntries
+ final long dpid = getDiskPageId(numOfPages - 1);
+ final ICachedPage page = bufferCache.pin(dpid, false);
+ try {
+ final ByteBuffer buf = page.getBuffer();
+
+ //Start at 1 since it is impossible to have EOF at the first entry of a page
+ int i = 1;
+ //Seek EOF and count number of entries
+ while (i < numOfEntriesPerPage && buf.getLong(i * ENTRY_LENGTH) != EOF) {
+ i++;
+ }
+
+ totalNumOfPages = (numOfPages - 1) * numOfEntriesPerPage + i;
+ } finally {
+ bufferCache.unpin(page);
+ }
+ }
+
+ private ICachedPage pinAndGetPage(int compressedPageId) throws HyracksDataException {
+ final int pageId = compressedPageId * ENTRY_LENGTH / bufferCache.getPageSize();
+ return bufferCache.pin(getDiskPageId(pageId), false);
+ }
+
+ private long getDiskPageId(int pageId) {
+ return BufferedFileHandle.getDiskPageId(fileId, pageId);
+ }
+
+ private void setCompressedPageInfo(int compressedPageId, ICachedPageInternal compressedPage)
+ throws HyracksDataException {
+ ensureState(READABLE);
+ if (totalNumOfPages == 0) {
+ /*
+ * It seems it is legal to pin empty file.
+ * Return the page information as it is not compressed.
+ */
+ compressedPage.setCompressedPageOffset(0);
+ compressedPage.setCompressedPageSize(bufferCache.getPageSize());
+ return;
+ }
+ final ICachedPage page = pinAndGetPage(compressedPageId);
+ try {
+ // No need for read latches as pages are immutable.
+ final ByteBuffer buf = page.getBuffer();
+ final int entryOffset = compressedPageId * ENTRY_LENGTH % bufferCache.getPageSize();
+ compressedPage.setCompressedPageOffset(buf.getLong(entryOffset));
+ compressedPage.setCompressedPageSize((int) buf.getLong(entryOffset + SIZE_ENTRY_OFFSET));
+ } finally {
+ bufferCache.unpin(page);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
new file mode 100644
index 0000000..3fb682c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression.file;
+
+import java.util.Objects;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IODeviceHandle;
+
+public class CompressedFileReference extends FileReference {
+ private static final long serialVersionUID = 1L;
+
+ private final String lafPath;
+ private final FileReference lafFileRef;
+ private final transient ICompressorDecompressor compressorDecompressor;
+
+ public CompressedFileReference(IODeviceHandle dev, ICompressorDecompressor compressorDecompressor, String path,
+ String lafPath) {
+ super(dev, path);
+ this.lafPath = lafPath;
+ lafFileRef = new FileReference(dev, lafPath);
+ this.compressorDecompressor = compressorDecompressor;
+ }
+
+ public FileReference getLAFFileReference() {
+ return lafFileRef;
+ }
+
+ public ICompressorDecompressor getCompressorDecompressor() {
+ return compressorDecompressor;
+ }
+
+ @Override
+ public boolean delete() {
+ return lafFileRef.delete() && super.delete();
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof CompressedFileReference)) {
+ return false;
+ }
+ return super.equals(o) && lafPath.equals(((CompressedFileReference) o).lafPath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.getRelativePath(), lafPath);
+ }
+
+ /**
+ * @return the relative path for LAF file
+ */
+ public String getLAFRelativePath() {
+ return lafPath;
+ }
+
+ /**
+ * @return the absolute path for LAF file
+ */
+ public String getLAFAbsolutePath() {
+ return lafFileRef.getAbsolutePath();
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java
new file mode 100644
index 0000000..86525b0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression.file;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+
+/**
+ * An interface that exposes the Look Aside File (LAF) writer to the indexes.
+ */
+public interface ICompressedPageWriter {
+ /**
+ * Before the index can write a compressed page, the index has to prepare the writer.
+ *
+ * @param cPage
+ * @throws HyracksDataException
+ */
+ public void prepareWrite(ICachedPage cPage) throws HyracksDataException;
+
+ /**
+ * Signal the writer to abort.
+ */
+ public void abort();
+
+ /**
+ * Finalize the writing of the compressed pages.
+ *
+ * @return
+ * @throws HyracksDataException
+ */
+ void endWriting() throws HyracksDataException;
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
new file mode 100644
index 0000000..dcccc52
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression.file;
+
+import static org.apache.hyracks.storage.common.compression.file.CompressedFileManager.ENTRY_LENGTH;
+import static org.apache.hyracks.storage.common.compression.file.CompressedFileManager.EOF;
+import static org.apache.hyracks.storage.common.compression.file.CompressedFileManager.SIZE_ENTRY_OFFSET;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPageInternal;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+/**
+ * Look Aside File writer
+ * This class is called by two threads simultaneously:
+ * - a thread to prepare the LAF page (bulk-loader)
+ * - and a writer thread to write the LAF page (buffer cache writer thread)
+ * Hence, it is not thread safe to have more than one thread to prepare or to write LAF pages.
+ */
+@NotThreadSafe
+class LAFWriter implements ICompressedPageWriter {
+ private final CompressedFileManager compressedFileManager;
+ private final IBufferCache bufferCache;
+ private final IFIFOPageQueue queue;
+ private final Map<Integer, LAFFrame> cachedFrames;
+ private final Queue<LAFFrame> recycledFrames;
+ private final int fileId;
+ private final int maxNumOfEntries;
+ private final PageWriteFailureCallback callBack;
+ private LAFFrame currentFrame;
+ private int currentPageId;
+ private int maxPageId;
+
+ private long lastOffset;
+ private int totalNumOfPages;
+
+ public LAFWriter(CompressedFileManager compressedFileManager, IBufferCache bufferCache) {
+ this.compressedFileManager = compressedFileManager;
+ this.bufferCache = bufferCache;
+ queue = bufferCache.createFIFOQueue();
+ cachedFrames = new HashMap<>();
+ recycledFrames = new ArrayDeque<>();
+ this.fileId = compressedFileManager.getFileId();
+ callBack = new PageWriteFailureCallback();
+
+ maxNumOfEntries = bufferCache.getPageSize() / ENTRY_LENGTH;
+ lastOffset = 0;
+ totalNumOfPages = 0;
+ maxPageId = -1;
+ currentPageId = -1;
+ }
+
+ /* ************************************
+ * ICompressedPageWriter methods
+ * Called by non-BufferCache thread (Bulk-loader)
+ * ************************************
+ */
+
+ @Override
+ public void prepareWrite(ICachedPage cPage) throws HyracksDataException {
+ final ICachedPageInternal internalPage = (ICachedPageInternal) cPage;
+ final int entryPageId = getLAFEntryPageId(BufferedFileHandle.getPageId(internalPage.getDiskPageId()));
+
+ synchronized (cachedFrames) {
+ if (!cachedFrames.containsKey(entryPageId)) {
+ try {
+ //Writing new page(s). Confiscate the page(s) from the buffer cache.
+ prepareFrames(entryPageId, internalPage);
+ } catch (HyracksDataException e) {
+ abort();
+ throw e;
+ }
+ }
+ }
+ }
+
+ private void prepareFrames(int entryPageId, ICachedPageInternal cPage) throws HyracksDataException {
+ //Confiscate the first page
+ confiscatePage(entryPageId);
+ //check if extra pages spans to the next entry page
+ for (int i = 0; i < cPage.getFrameSizeMultiplier() - 1; i++) {
+ final int extraEntryPageId = getLAFEntryPageId(cPage.getExtraBlockPageId() + i);
+ if (!cachedFrames.containsKey(extraEntryPageId)) {
+ confiscatePage(extraEntryPageId);
+ }
+ }
+ }
+
+ private void confiscatePage(int pageId) throws HyracksDataException {
+ //Writing new page. Confiscate the page from the buffer cache.
+ final ICachedPage newPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, pageId));
+ cachedFrames.put(pageId, getLAFFrame(newPage));
+ maxPageId = Math.max(maxPageId, pageId);
+ }
+
+ private LAFFrame getLAFFrame(ICachedPage cPage) {
+ LAFFrame lafFrame = recycledFrames.poll();
+ if (lafFrame == null) {
+ lafFrame = new LAFFrame();
+ }
+ lafFrame.setCachedPage(cPage);
+ return lafFrame;
+ }
+
+ @Override
+ public void endWriting() throws HyracksDataException {
+ if (callBack.hasFailed()) {
+ //if write failed, return confiscated pages
+ abort();
+ throw HyracksDataException.create(callBack.getFailure());
+ }
+ synchronized (cachedFrames) {
+ final LAFFrame lastPage = cachedFrames.get(maxPageId);
+ if (lastPage != null && !lastPage.isFull()) {
+ /*
+ * The last page may or may not be filled. In case it is not filled (i.e do not have
+ * the max number of entries). Then, write an indicator after the last entry.
+ * If it has been written (i.e lastPage = null), that means it has been filled.
+ */
+ lastPage.setEOF();
+ }
+ for (Entry<Integer, LAFFrame> entry : cachedFrames.entrySet()) {
+ queue.put(entry.getValue().cPage, callBack);
+ }
+ bufferCache.finishQueue();
+
+ //Signal the compressedFileManager to change its state
+ compressedFileManager.endWriting(totalNumOfPages);
+ }
+ }
+
+ @Override
+ public void abort() {
+ synchronized (cachedFrames) {
+ for (Entry<Integer, LAFFrame> frame : cachedFrames.entrySet()) {
+ bufferCache.returnPage(frame.getValue().cPage);
+ }
+ }
+ }
+
+ /* ************************************
+ * Local methods:
+ * Called by BufferCache writer thread
+ * ************************************
+ */
+
+ public long writePageInfo(int pageId, long size) throws HyracksDataException {
+ final LAFFrame frame = getPageBuffer(pageId);
+
+ final long pageOffset = lastOffset;
+ frame.writePageInfo(pageId, pageOffset, size);
+ lastOffset += size;
+ totalNumOfPages++;
+
+ writeFullPage();
+ return pageOffset;
+ }
+
+ private LAFFrame getPageBuffer(int compressedPageId) {
+ final int pageId = getLAFEntryPageId(compressedPageId);
+
+ if (currentPageId == pageId) {
+ return currentFrame;
+ }
+
+ final LAFFrame frame;
+ synchronized (cachedFrames) {
+ //Check if the frame is cached
+ frame = cachedFrames.get(pageId);
+ if (frame == null) {
+ //Trying to write unprepared page
+ abort();
+ throw new IllegalStateException("Unprepared compressed-write for page ID: " + pageId);
+ }
+ }
+
+ currentFrame = frame;
+ currentPageId = pageId;
+ return frame;
+ }
+
+ private void writeFullPage() throws HyracksDataException {
+ if (currentFrame.isFull()) {
+ //The LAF page is filled. We do not need to keep it.
+ //Write it to the file and remove it from the cachedFrames map
+ queue.put(currentFrame.cPage, callBack);
+ synchronized (cachedFrames) {
+ //Recycle the frame
+ final LAFFrame frame = cachedFrames.remove(currentPageId);
+ frame.setCachedPage(null);
+ recycledFrames.add(frame);
+ }
+ currentFrame = null;
+ currentPageId = -1;
+ }
+ }
+
+ private int getLAFEntryPageId(int compressedPageId) {
+ return compressedPageId * ENTRY_LENGTH / bufferCache.getPageSize();
+ }
+
+ private class LAFFrame {
+ private ICachedPage cPage;
+ private int numOfEntries;
+ private int maxEntryOffset;
+
+ public void setCachedPage(ICachedPage cPage) {
+ this.cPage = cPage;
+ numOfEntries = 0;
+ maxEntryOffset = -1;
+ }
+
+ public void writePageInfo(int compressedPageId, long offset, long size) {
+ final int entryOffset = compressedPageId * ENTRY_LENGTH % bufferCache.getPageSize();
+ //Put page offset
+ cPage.getBuffer().putLong(entryOffset, offset);
+ //Put page size
+ cPage.getBuffer().putLong(entryOffset + SIZE_ENTRY_OFFSET, size);
+ //Keep the max entry offset to set EOF (if needed)
+ maxEntryOffset = Math.max(maxEntryOffset, entryOffset);
+ numOfEntries++;
+ }
+
+ public void setEOF() {
+ cPage.getBuffer().putLong(maxEntryOffset + ENTRY_LENGTH, EOF);
+ }
+
+ public boolean isFull() {
+ return numOfEntries == maxNumOfEntries;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java
new file mode 100644
index 0000000..8f957a9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.compression.file;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+
+public class NoOpLAFWriter implements ICompressedPageWriter {
+ public static final NoOpLAFWriter INSTACNE = new NoOpLAFWriter();
+
+ private NoOpLAFWriter() {
+ }
+
+ @Override
+ public void prepareWrite(ICachedPage cPage) throws HyracksDataException {
+ //NoOp
+ }
+
+ @Override
+ public void abort() {
+ //NoOp
+ }
+
+ @Override
+ public void endWriting() throws HyracksDataException {
+ //NoOp
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
index 4f15588..11862dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
@@ -18,18 +18,32 @@
*/
package org.apache.hyracks.storage.common.file;
-import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.hyracks.storage.common.buffercache.BufferCache.DEBUG;
-import org.apache.hyracks.api.io.IFileHandle;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
-public class BufferedFileHandle {
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.common.buffercache.AbstractBufferedFileIOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
+import org.apache.hyracks.storage.common.compression.file.NoOpLAFWriter;
+
+public class BufferedFileHandle extends AbstractBufferedFileIOManager {
private final int fileId;
- private volatile IFileHandle handle;
private final AtomicInteger refCount;
- public BufferedFileHandle(int fileId, IFileHandle handle) {
+ protected BufferedFileHandle(int fileId, BufferCache bufferCache, IIOManager ioManager,
+ BlockingQueue<BufferCacheHeaderHelper> headerPageCache, IPageReplacementStrategy pageReplacementStrategy) {
+ super(bufferCache, ioManager, headerPageCache, pageReplacementStrategy);
this.fileId = fileId;
- this.handle = handle;
refCount = new AtomicInteger();
}
@@ -37,22 +51,6 @@ public class BufferedFileHandle {
return fileId;
}
- public void setFileHandle(IFileHandle fileHandle) {
- this.handle = fileHandle;
- }
-
- public IFileHandle getFileHandle() {
- return handle;
- }
-
- public void markAsDeleted() {
- handle = null;
- }
-
- public boolean fileHasBeenDeleted() {
- return handle == null;
- }
-
public int incReferenceCount() {
return refCount.incrementAndGet();
}
@@ -69,6 +67,86 @@ public class BufferedFileHandle {
return getDiskPageId(fileId, pageId);
}
+ @Override
+ public void read(CachedPage cPage) throws HyracksDataException {
+ final BufferCacheHeaderHelper header = checkoutHeaderHelper();
+ try {
+ long bytesRead =
+ readToBuffer(header.prepareRead(bufferCache.getPageSizeWithHeader()), getFirstPageOffset(cPage));
+
+ if (!verifyBytesRead(bufferCache.getPageSizeWithHeader(), bytesRead)) {
+ return;
+ }
+
+ final ByteBuffer buf = header.processHeader(cPage);
+ cPage.getBuffer().put(buf);
+ } finally {
+ returnHeaderHelper(header);
+ }
+
+ readExtraPages(cPage);
+ }
+
+ private void readExtraPages(CachedPage cPage) throws HyracksDataException {
+ final int totalPages = cPage.getFrameSizeMultiplier();
+ if (totalPages > 1) {
+ pageReplacementStrategy.fixupCapacityOnLargeRead(cPage);
+ cPage.getBuffer().position(bufferCache.getPageSize());
+ cPage.getBuffer().limit(totalPages * bufferCache.getPageSize());
+ readToBuffer(cPage.getBuffer(), getExtraPageOffset(cPage));
+ }
+ }
+
+ @Override
+ protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId)
+ throws HyracksDataException {
+ final ByteBuffer buf = cPage.getBuffer();
+ final boolean contiguousLargePages = getPageId(cPage.getDiskPageId()) + 1 == extraBlockPageId;
+ long bytesWritten;
+ try {
+ buf.limit(contiguousLargePages ? bufferCache.getPageSize() * totalPages : bufferCache.getPageSize());
+ buf.position(0);
+ bytesWritten = writeToFile(header.prepareWrite(cPage), getFirstPageOffset(cPage));
+ } finally {
+ returnHeaderHelper(header);
+ }
+
+ if (totalPages > 1 && !contiguousLargePages) {
+ buf.limit(totalPages * bufferCache.getPageSize());
+ bytesWritten += writeToFile(buf, getExtraPageOffset(cPage));
+ }
+
+ final int expectedWritten = bufferCache.getPageSizeWithHeader() + bufferCache.getPageSize() * (totalPages - 1);
+ verifyBytesWritten(expectedWritten, bytesWritten);
+ }
+
+ @Override
+ public int getNumberOfPages() {
+ if (DEBUG) {
+ assert getFileSize() % bufferCache.getPageSizeWithHeader() == 0;
+ }
+ return (int) (getFileSize() / bufferCache.getPageSizeWithHeader());
+ }
+
+ @Override
+ public ICompressedPageWriter getCompressedPageWriter() {
+ return NoOpLAFWriter.INSTACNE;
+ }
+
+ @Override
+ protected long getFirstPageOffset(CachedPage cPage) {
+ return getPageOffset(getPageId(cPage.getDiskPageId()));
+ }
+
+ @Override
+ protected long getExtraPageOffset(CachedPage cPage) {
+ return getPageOffset(cPage.getExtraBlockPageId());
+ }
+
+ private long getPageOffset(long pageId) {
+ return pageId * bufferCache.getPageSizeWithHeader();
+ }
+
public static long getDiskPageId(int fileId, int pageId) {
return (((long) fileId) << 32) + pageId;
}
@@ -80,4 +158,15 @@ public class BufferedFileHandle {
public static int getPageId(long dpid) {
return (int) dpid;
}
+
+ public static BufferedFileHandle create(FileReference fileRef, int fileId, BufferCache bufferCache,
+ IIOManager ioManager, BlockingQueue<BufferCacheHeaderHelper> headerPageCache,
+ IPageReplacementStrategy pageReplacementStrategy) {
+ if (fileRef.isCompressed()) {
+ final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
+ return new CompressedBufferedFileHandle(fileId, cFileRef.getLAFFileReference(), bufferCache, ioManager,
+ headerPageCache, pageReplacementStrategy);
+ }
+ return new BufferedFileHandle(fileId, bufferCache, ioManager, headerPageCache, pageReplacementStrategy);
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
new file mode 100644
index 0000000..235e144
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.file;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileManager;
+import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
+import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
+
+public class CompressedBufferedFileHandle extends BufferedFileHandle {
+ private final FileReference lafFileRef;
+ private volatile CompressedFileManager compressedFileManager;
+
+ protected CompressedBufferedFileHandle(int fileId, FileReference lafFileRef, BufferCache bufferCache,
+ IIOManager ioManager, BlockingQueue<BufferCacheHeaderHelper> headerPageCache,
+ IPageReplacementStrategy pageReplacementStrategy) {
+ super(fileId, bufferCache, ioManager, headerPageCache, pageReplacementStrategy);
+ this.lafFileRef = lafFileRef;
+ }
+
+ @Override
+ public void read(CachedPage cPage) throws HyracksDataException {
+ final BufferCacheHeaderHelper header = checkoutHeaderHelper();
+ try {
+ compressedFileManager.setCompressedPageInfo(cPage);
+ long bytesRead = readToBuffer(header.prepareRead(cPage.getCompressedPageSize()), getFirstPageOffset(cPage));
+
+ if (!verifyBytesRead(cPage.getCompressedPageSize(), bytesRead)) {
+ return;
+ }
+ final ByteBuffer cBuffer = header.processHeader(cPage);
+ final ByteBuffer uBuffer = cPage.getBuffer();
+ fixBufferPointers(uBuffer, 0);
+ if (cPage.getCompressedPageSize() < bufferCache.getPageSizeWithHeader()) {
+ uncompressToPageBuffer(cBuffer, uBuffer);
+ } else {
+ cPage.getBuffer().put(cBuffer);
+ }
+
+ final int totalPages = cPage.getFrameSizeMultiplier();
+ if (totalPages > 1) {
+ pageReplacementStrategy.fixupCapacityOnLargeRead(cPage);
+ readExtraPages(cPage, cBuffer);
+ }
+ } finally {
+ returnHeaderHelper(header);
+ }
+ }
+
+ private void readExtraPages(CachedPage cPage, ByteBuffer cBuffer) throws HyracksDataException {
+ final ByteBuffer uBuffer = cPage.getBuffer();
+
+ final int totalPages = cPage.getFrameSizeMultiplier();
+ for (int i = 1; i < totalPages; i++) {
+ fixBufferPointers(uBuffer, i);
+ compressedFileManager.setExtraCompressedPageInfo(cPage, i - 1);
+ if (cPage.getCompressedPageSize() < bufferCache.getPageSize()) {
+ cBuffer.position(0);
+ cBuffer.limit(cPage.getCompressedPageSize());
+ readToBuffer(cBuffer, getExtraPageOffset(cPage));
+ cBuffer.flip();
+ uncompressToPageBuffer(cBuffer, cPage.getBuffer());
+ } else {
+ readToBuffer(uBuffer, getExtraPageOffset(cPage));
+ }
+ }
+ }
+
+ @Override
+ protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId)
+ throws HyracksDataException {
+ try {
+ final ByteBuffer cBuffer = header.prepareWrite(cPage, getRequiredBufferSize());
+ final ByteBuffer uBuffer = cPage.getBuffer();
+ final long pageId = cPage.getDiskPageId();
+
+ final long bytesWritten;
+ final long expectedBytesWritten;
+
+ fixBufferPointers(uBuffer, 0);
+ if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) {
+ cBuffer.position(0);
+ final long offset = compressedFileManager.writePageInfo(pageId, cBuffer.remaining());
+ expectedBytesWritten = cBuffer.limit();
+ bytesWritten = writeToFile(cBuffer, offset);
+ } else {
+ //Compression did not gain any savings
+ final ByteBuffer[] buffers = header.prepareWrite(cPage);
+ final long offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader());
+ expectedBytesWritten = buffers[0].limit() + (long) buffers[1].limit();
+ bytesWritten = writeToFile(buffers, offset);
+ }
+
+ verifyBytesWritten(expectedBytesWritten, bytesWritten);
+
+ //Write extra pages
+ if (totalPages > 1) {
+ writeExtraCompressedPages(cPage, cBuffer, totalPages, extraBlockPageId);
+ }
+
+ } finally {
+ returnHeaderHelper(header);
+ }
+ }
+
+ private void writeExtraCompressedPages(CachedPage cPage, ByteBuffer cBuffer, int totalPages, int extraBlockPageId)
+ throws HyracksDataException {
+ final ByteBuffer uBuffer = cPage.getBuffer();
+ long expectedBytesWritten = 0;
+ long bytesWritten = 0;
+ for (int i = 1; i < totalPages; i++) {
+ fixBufferPointers(uBuffer, i);
+ cBuffer.position(0);
+
+ final ByteBuffer writeBuffer;
+ if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) {
+ writeBuffer = cBuffer;
+ } else {
+ writeBuffer = uBuffer;
+ }
+ final int length = writeBuffer.remaining();
+ final long offset = compressedFileManager.writeExtraPageInfo(extraBlockPageId, length, i - 1);
+ expectedBytesWritten += length;
+ bytesWritten += writeToFile(writeBuffer, offset);
+ }
+
+ verifyBytesWritten(expectedBytesWritten, bytesWritten);
+
+ }
+
+ @Override
+ public void open(FileReference fileRef) throws HyracksDataException {
+ final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
+ final int lafFileId = bufferCache.openFile(cFileRef.getLAFFileReference());
+
+ compressedFileManager = new CompressedFileManager(bufferCache, lafFileId, cFileRef);
+ compressedFileManager.open();
+ super.open(fileRef);
+ }
+
+ /**
+ * Decrement the reference counter for LAF file.
+ * It is up to {@link BufferCache} to physically close the file.
+ * see {@link BufferCache#deleteFile(FileReference)} and {@link BufferCache#purgeHandle(int)}
+ */
+ @Override
+ public void close() throws HyracksDataException {
+ if (hasBeenOpened()) {
+ compressedFileManager.close();
+ bufferCache.closeFile(compressedFileManager.getFileId());
+ }
+ super.close();
+ }
+
+ @Override
+ public void purge() throws HyracksDataException {
+ super.purge();
+ compressedFileManager.close();
+ bufferCache.closeFile(compressedFileManager.getFileId());
+ bufferCache.purgeHandle(compressedFileManager.getFileId());
+ }
+
+ @Override
+ public void markAsDeleted() throws HyracksDataException {
+ if (hasBeenOpened()) {
+ bufferCache.deleteFile(compressedFileManager.getFileId());
+ compressedFileManager = null;
+ } else {
+ bufferCache.deleteFile(lafFileRef);
+ }
+ super.markAsDeleted();
+ }
+
+ @Override
+ public void force(boolean metadata) throws HyracksDataException {
+ super.force(metadata);
+ bufferCache.force(compressedFileManager.getFileId(), metadata);
+ }
+
+ @Override
+ public int getNumberOfPages() {
+ return compressedFileManager.getNumberOfPages();
+ }
+
+ @Override
+ protected long getFirstPageOffset(CachedPage cPage) {
+ return cPage.getCompressedPageOffset();
+ }
+
+ @Override
+ protected long getExtraPageOffset(CachedPage cPage) {
+ return getFirstPageOffset(cPage);
+ }
+
+ @Override
+ public ICompressedPageWriter getCompressedPageWriter() {
+ return compressedFileManager.getCompressedPageWriter();
+ }
+
+ /* ********************************
+ * Compression methods
+ * ********************************
+ */
+
+ private void fixBufferPointers(ByteBuffer uBuffer, int i) {
+ //Fix the uncompressed buffer to point at the i^th extra page
+ uBuffer.position(bufferCache.getPageSize() * i);
+ //Similarly, fix the limit to a page-worth of data from the i^th page
+ uBuffer.limit(uBuffer.position() + bufferCache.getPageSize());
+ }
+
+ private void uncompressToPageBuffer(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException {
+ final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor();
+ compDecomp.uncompress(cBuffer, uBuffer);
+ verifyUncompressionSize(bufferCache.getPageSize(), uBuffer.remaining());
+ }
+
+ private int compressToWriteBuffer(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException {
+ final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor();
+ compDecomp.compress(uBuffer, cBuffer);
+ return cBuffer.remaining();
+ }
+
+ private int getRequiredBufferSize() {
+ final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor();
+ return compDecomp.computeCompressedBufferSize(bufferCache.getPageSize());
+ }
+
+ private void verifyUncompressionSize(int expected, int actual) {
+ if (expected != actual) {
+ throwException("Uncompressed", expected, actual);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
index aee7e90..b8727b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
import org.apache.hyracks.storage.am.lsm.btree.utils.LSMBTreeUtil;
import org.apache.hyracks.storage.common.IIndexAccessor;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import org.apache.hyracks.util.trace.ITracer;
import org.junit.After;
import org.junit.Before;
@@ -62,7 +63,8 @@ public class LSMBTreeExamplesTest extends OrderedIndexExamplesTest {
bloomFilterKeyFields, harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
harness.getOperationTracker(), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true,
filterTypeTraits, filterCmpFactories, btreeFields, filterFields, true,
- harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
+ harness.getMetadataPageManagerFactory(), false, ITracer.NONE,
+ NoOpCompressorDecompressorFactory.INSTANCE);
}
@Before
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
index 4fafb38..d83f475 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
@@ -31,6 +31,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerFactory;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import org.apache.hyracks.util.trace.ITracer;
import org.junit.Test;
@@ -52,7 +53,8 @@ public class LSMBTreeModificationOperationCallbackTest extends AbstractModificat
harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(),
harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
- harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
+ harness.getMetadataPageManagerFactory(), false, ITracer.NONE,
+ NoOpCompressorDecompressorFactory.INSTANCE);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index 3dfb369..59c9ebb 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -40,6 +40,7 @@ import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import org.apache.hyracks.util.trace.ITracer;
import org.junit.Assert;
import org.junit.Test;
@@ -61,7 +62,8 @@ public class LSMBTreeSearchOperationCallbackTest extends AbstractSearchOperation
harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(),
harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
- harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
+ harness.getMetadataPageManagerFactory(), false, ITracer.NONE,
+ NoOpCompressorDecompressorFactory.INSTANCE);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
index 0914541..c14474b 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerFactory;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import org.apache.hyracks.util.trace.ITracer;
import org.junit.After;
import org.junit.Assert;
@@ -74,7 +75,8 @@ public class LSMBTreeUpdateInPlaceTest extends AbstractOperationCallbackTest {
harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(),
harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
- harness.getMetadataPageManagerFactory(), true, ITracer.NONE);
+ harness.getMetadataPageManagerFactory(), true, ITracer.NONE,
+ NoOpCompressorDecompressorFactory.INSTANCE);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
index b25a229..baa95e9 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -57,12 +58,13 @@ public class TestLsmBtreeLocalResource extends LSMBTreeLocalResource {
super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, path,
storageManager, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories,
btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory,
- vbcProvider, ioSchedulerProvider, durable);
+ vbcProvider, ioSchedulerProvider, durable, NoOpCompressorDecompressorFactory.INSTANCE);
}
protected TestLsmBtreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) throws HyracksDataException {
- super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields);
+ super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
+ NoOpCompressorDecompressorFactory.INSTANCE);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
index 6b13f56..3d7c520 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
@@ -32,6 +32,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
public class TestLsmBtreeLocalResourceFactory extends LSMBTreeLocalResourceFactory {
private static final long serialVersionUID = 1L;
@@ -47,7 +48,7 @@ public class TestLsmBtreeLocalResourceFactory extends LSMBTreeLocalResourceFacto
super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
mergePolicyFactory, mergePolicyProperties, durable, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
- isPrimary, btreeFields);
+ isPrimary, btreeFields, NoOpCompressorDecompressorFactory.INSTANCE);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
index 6950f86..85038c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java
@@ -32,6 +32,7 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.datagen.ProbabilityHelper;
import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
import org.apache.hyracks.storage.am.lsm.btree.utils.LSMBTreeUtil;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import org.apache.hyracks.util.trace.ITracer;
public class LSMBTreeMultiThreadTest extends OrderedIndexMultiThreadTest {
@@ -57,7 +58,8 @@ public class LSMBTreeMultiThreadTest extends OrderedIndexMultiThreadTest {
harness.getFileReference(), harness.getDiskBufferCache(), typeTraits, cmpFactories,
bloomFilterKeyFields, harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
harness.getOperationTracker(), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true,
- null, null, null, null, true, harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
+ null, null, null, null, true, harness.getMetadataPageManagerFactory(), false, ITracer.NONE,
+ NoOpCompressorDecompressorFactory.INSTANCE);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 2462c85..3bb1fa4 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -36,9 +36,9 @@ import org.apache.hyracks.storage.am.common.datagen.TupleBatch;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import org.apache.hyracks.storage.am.lsm.btree.utils.LSMBTreeUtil;
+import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
-import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
@@ -48,6 +48,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.util.ExitUtil;
@@ -125,7 +126,7 @@ public class LSMTreeRunner implements IExperimentRunner {
cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, new NoMergePolicy(),
new ThreadCountingTracker(), ioScheduler, NoOpIOOperationCallbackFactory.INSTANCE, true, null, null,
null, null, true, TestStorageManagerComponentHolder.getMetadataPageManagerFactory(), false,
- ITracer.NONE);
+ ITracer.NONE, NoOpCompressorDecompressorFactory.INSTANCE);
}
@Override