You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2016/04/12 15:58:28 UTC
[11/39] accumulo git commit: ACCUMULO-4164 Avoid copying rfile index
when in cache. Avoid sync when deserializing index.
ACCUMULO-4164 Avoid copying rfile index when in cache. Avoid sync when deserializing index.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2afc3dc8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2afc3dc8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2afc3dc8
Branch: refs/heads/ACCUMULO-4173
Commit: 2afc3dc87d158667da72a8959726bce62de5dee6
Parents: 41e002d
Author: Keith Turner <kt...@apache.org>
Authored: Fri Apr 1 08:43:48 2016 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Apr 1 12:56:40 2016 -0400
----------------------------------------------------------------------
.../core/file/blockfile/ABlockReader.java | 2 +
.../file/blockfile/impl/CachableBlockFile.java | 45 ++--
.../impl/SeekableByteArrayInputStream.java | 141 ++++++++++++
.../core/file/rfile/MultiLevelIndex.java | 219 +++++++++++++------
4 files changed, 311 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
index 8df2469..9d7a01a 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
@@ -51,4 +51,6 @@ public interface ABlockReader extends DataInput {
int getPosition();
<T> T getIndex(Class<T> clazz);
+
+ byte[] getBuffer();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 4d65c9f..7496202 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -70,21 +70,25 @@ public class CachableBlockFile {
_bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration);
}
+ @Override
public ABlockWriter prepareMetaBlock(String name) throws IOException {
_bw = new BlockWrite(_bc.prepareMetaBlock(name));
return _bw;
}
+ @Override
public ABlockWriter prepareMetaBlock(String name, String compressionName) throws IOException {
_bw = new BlockWrite(_bc.prepareMetaBlock(name, compressionName));
return _bw;
}
+ @Override
public ABlockWriter prepareDataBlock() throws IOException {
_bw = new BlockWrite(_bc.prepareDataBlock());
return _bw;
}
+ @Override
public void close() throws IOException {
_bw.close();
@@ -369,6 +373,7 @@ public class CachableBlockFile {
* NOTE: In the case of multi-read threads: This method can do redundant work where an entry is read from disk and other threads check the cache before it
* has been inserted.
*/
+ @Override
public BlockRead getMetaBlock(String blockName) throws IOException {
String _lookup = this.fileName + "M" + blockName;
return getBlock(_lookup, _iCache, new MetaBlockLoader(blockName, accumuloConfiguration));
@@ -388,6 +393,7 @@ public class CachableBlockFile {
* has been inserted.
*/
+ @Override
public BlockRead getDataBlock(int blockIndex) throws IOException {
String _lookup = this.fileName + "O" + blockIndex;
return getBlock(_lookup, _dCache, new OffsetBlockLoader(blockIndex));
@@ -400,6 +406,7 @@ public class CachableBlockFile {
return getBlock(_lookup, _dCache, new RawBlockLoader(offset, compressedSize, rawSize));
}
+ @Override
public synchronized void close() throws IOException {
if (closed)
return;
@@ -416,30 +423,6 @@ public class CachableBlockFile {
}
- static class SeekableByteArrayInputStream extends ByteArrayInputStream {
-
- public SeekableByteArrayInputStream(byte[] buf) {
- super(buf);
- }
-
- public SeekableByteArrayInputStream(byte buf[], int offset, int length) {
- super(buf, offset, length);
- throw new UnsupportedOperationException("Seek code assumes offset is zero"); // do not need this constructor, documenting that seek will not work
- // unless offset it kept track of
- }
-
- public void seek(int position) {
- if (pos < 0 || pos >= buf.length)
- throw new IllegalArgumentException("pos = " + pos + " buf.lenght = " + buf.length);
- this.pos = position;
- }
-
- public int getPosition() {
- return this.pos;
- }
-
- }
-
public static class CachedBlockRead extends BlockRead {
private SeekableByteArrayInputStream seekableInput;
private final CacheEntry cb;
@@ -470,6 +453,11 @@ public class CachableBlockFile {
}
@Override
+ public byte[] getBuffer() {
+ return seekableInput.getBuffer();
+ }
+
+ @Override
public <T> T getIndex(Class<T> clazz) {
T bi = null;
synchronized (cb) {
@@ -510,6 +498,7 @@ public class CachableBlockFile {
/**
* Size is the size of the bytearray that was read form the cache
*/
+ @Override
public long getRawSize() {
return size;
}
@@ -543,5 +532,13 @@ public class CachableBlockFile {
throw new UnsupportedOperationException();
}
+ /**
+ * The byte array returned by this method is only for read optimizations, it should not be modified.
+ */
+ @Override
+ public byte[] getBuffer() {
+ throw new UnsupportedOperationException();
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java
new file mode 100644
index 0000000..c6e7d29
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.file.blockfile.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is like byte array input stream with two differences. It supports seeking and avoids synchronization.
+ */
+public class SeekableByteArrayInputStream extends InputStream {
+
+ // making this volatile for the following case
+ // * thread 1 creates and initalizes byte array
+ // * thread 2 reads from bye array
+ // Findbugs complains about this because thread2 may not see any changes to the byte array after thread 1 set the voltile,
+ // however the expectation is that the byte array is static. In the case of it being static, volatile ensures that
+ // thread 2 sees all of thread 1 changes before setting the volatile.
+ private volatile byte buffer[];
+ private int cur;
+ private int max;
+
+ @Override
+ public int read() {
+ if (cur < max) {
+ return buffer[cur++] & 0xff;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int read(byte b[], int offset, int length) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+
+ if (length < 0 || offset < 0 || length > b.length - offset) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ if (length == 0) {
+ return 0;
+ }
+
+ int avail = max - cur;
+
+ if (avail <= 0) {
+ return -1;
+ }
+
+ if (length > avail) {
+ length = avail;
+ }
+
+ System.arraycopy(buffer, cur, b, offset, length);
+ cur += length;
+ return length;
+ }
+
+ @Override
+ public long skip(long requestedSkip) {
+ long actualSkip = max - cur;
+ if (requestedSkip < actualSkip)
+ if (requestedSkip < 0)
+ actualSkip = 0;
+ else
+ actualSkip = requestedSkip;
+
+ cur += actualSkip;
+ return actualSkip;
+ }
+
+ @Override
+ public int available() {
+ return max - cur;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public void mark(int readAheadLimit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ public SeekableByteArrayInputStream(byte[] buf) {
+ Preconditions.checkNotNull(buf, "bug argument was null");
+ this.buffer = buf;
+ this.cur = 0;
+ this.max = buf.length;
+ }
+
+ public SeekableByteArrayInputStream(byte[] buf, int maxOffset) {
+ Preconditions.checkNotNull(buf, "bug argument was null");
+ this.buffer = buf;
+ this.cur = 0;
+ this.max = maxOffset;
+ }
+
+ public void seek(int position) {
+ if (position < 0 || position >= max)
+ throw new IllegalArgumentException("position = " + position + " maxOffset = " + max);
+ this.cur = position;
+ }
+
+ public int getPosition() {
+ return this.cur;
+ }
+
+ byte[] getBuffer() {
+ return buffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2afc3dc8/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
index 2109478..75ad4c8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -37,9 +37,12 @@ import org.apache.accumulo.core.file.blockfile.ABlockReader;
import org.apache.accumulo.core.file.blockfile.ABlockWriter;
import org.apache.accumulo.core.file.blockfile.BlockFileReader;
import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
+import org.apache.accumulo.core.file.blockfile.impl.SeekableByteArrayInputStream;
import org.apache.accumulo.core.file.rfile.bcfile.Utils;
import org.apache.hadoop.io.WritableComparable;
+import com.google.common.base.Preconditions;
+
public class MultiLevelIndex {
public static class IndexEntry implements WritableComparable<IndexEntry> {
@@ -129,85 +132,121 @@ public class MultiLevelIndex {
}
}
- // a list that deserializes index entries on demand
- private static class SerializedIndex extends AbstractList<IndexEntry> implements List<IndexEntry>, RandomAccess {
+ private static abstract class SerializedIndexBase<T> extends AbstractList<T> implements List<T>, RandomAccess {
+ protected int[] offsets;
+ protected byte[] data;
- private int[] offsets;
- private byte[] data;
- private boolean newFormat;
+ protected SeekableByteArrayInputStream sbais;
+ protected DataInputStream dis;
+ protected int offsetsOffset;
+ protected int indexOffset;
+ protected int numOffsets;
+ protected int indexSize;
- SerializedIndex(int[] offsets, byte[] data, boolean newFormat) {
+ SerializedIndexBase(int[] offsets, byte[] data) {
+ Preconditions.checkNotNull(offsets, "offsets argument was null");
+ Preconditions.checkNotNull(data, "data argument was null");
this.offsets = offsets;
this.data = data;
- this.newFormat = newFormat;
+ sbais = new SeekableByteArrayInputStream(data);
+ dis = new DataInputStream(sbais);
}
- @Override
- public IndexEntry get(int index) {
- int len;
- if (index == offsets.length - 1)
- len = data.length - offsets[index];
- else
- len = offsets[index + 1] - offsets[index];
+ SerializedIndexBase(byte[] data, int offsetsOffset, int numOffsets, int indexOffset, int indexSize) {
+ Preconditions.checkNotNull(data, "data argument was null");
+ sbais = new SeekableByteArrayInputStream(data, indexOffset + indexSize);
+ dis = new DataInputStream(sbais);
+ this.offsetsOffset = offsetsOffset;
+ this.indexOffset = indexOffset;
+ this.numOffsets = numOffsets;
+ this.indexSize = indexSize;
+ }
- ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
- DataInputStream dis = new DataInputStream(bais);
+ /**
+ * Before this method is called, {@code this.dis} is seeked to the offset of a serialized index entry. This method should deserialize the index entry by
+ * reading from {@code this.dis} and return it.
+ */
+ protected abstract T newValue() throws IOException;
- IndexEntry ie = new IndexEntry(newFormat);
+ @Override
+ public T get(int index) {
try {
- ie.readFields(dis);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ int offset;
+ if (offsets == null) {
+ if (index < 0 || index >= numOffsets) {
+ throw new IndexOutOfBoundsException("index:" + index + " numOffsets:" + numOffsets);
+ }
+ sbais.seek(offsetsOffset + index * 4);
+ offset = dis.readInt();
+ } else {
+ offset = offsets[index];
+ }
- return ie;
+ sbais.seek(indexOffset + offset);
+ return newValue();
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
}
@Override
public int size() {
- return offsets.length;
- }
-
- public long sizeInBytes() {
- return data.length + 4 * offsets.length;
+ if (offsets == null) {
+ return numOffsets;
+ } else {
+ return offsets.length;
+ }
}
}
- private static class KeyIndex extends AbstractList<Key> implements List<Key>, RandomAccess {
+ // a list that deserializes index entries on demand
+ private static class SerializedIndex extends SerializedIndexBase<IndexEntry> {
- private int[] offsets;
- private byte[] data;
+ private boolean newFormat;
- KeyIndex(int[] offsets, byte[] data) {
- this.offsets = offsets;
- this.data = data;
+ SerializedIndex(int[] offsets, byte[] data, boolean newFormat) {
+ super(offsets, data);
+ this.newFormat = newFormat;
+ }
+
+ SerializedIndex(byte[] data, int offsetsOffset, int numOffsets, int indexOffset, int indexSize) {
+ super(data, offsetsOffset, numOffsets, indexOffset, indexSize);
+ this.newFormat = true;
+ }
+
+ public long sizeInBytes() {
+ if (offsets == null) {
+ return indexSize + 4 * numOffsets;
+ } else {
+ return data.length + 4 * offsets.length;
+ }
}
@Override
- public Key get(int index) {
- int len;
- if (index == offsets.length - 1)
- len = data.length - offsets[index];
- else
- len = offsets[index + 1] - offsets[index];
+ protected IndexEntry newValue() throws IOException {
+ IndexEntry ie = new IndexEntry(newFormat);
+ ie.readFields(dis);
+ return ie;
+ }
+
+ }
- ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
- DataInputStream dis = new DataInputStream(bais);
+ private static class KeyIndex extends SerializedIndexBase<Key> {
- Key key = new Key();
- try {
- key.readFields(dis);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ KeyIndex(int[] offsets, byte[] data) {
+ super(offsets, data);
+ }
- return key;
+ KeyIndex(byte[] data, int offsetsOffset, int numOffsets, int indexOffset, int indexSize) {
+ super(data, offsetsOffset, numOffsets, indexOffset, indexSize);
}
@Override
- public int size() {
- return offsets.length;
+ protected Key newValue() throws IOException {
+ Key key = new Key();
+ key.readFields(dis);
+ return key;
}
}
@@ -219,11 +258,16 @@ public class MultiLevelIndex {
private ArrayList<Integer> offsets;
private int level;
private int offset;
-
- SerializedIndex index;
- KeyIndex keyIndex;
private boolean hasNext;
+ private byte data[];
+ private int[] offsetsArray;
+ private int numOffsets;
+ private int offsetsOffset;
+ private int indexSize;
+ private int indexOffset;
+ private boolean newFormat;
+
public IndexBlock(int level, int totalAdded) {
// System.out.println("IndexBlock("+level+","+levelCount+","+totalAdded+")");
@@ -270,18 +314,35 @@ public class MultiLevelIndex {
offset = in.readInt();
hasNext = in.readBoolean();
- int numOffsets = in.readInt();
- int[] offsets = new int[numOffsets];
-
- for (int i = 0; i < numOffsets; i++)
- offsets[i] = in.readInt();
+ ABlockReader abr = (ABlockReader) in;
+ if (abr.isIndexable()) {
+ // this block is cahced, so avoid copy
+ data = abr.getBuffer();
+ // use offset data in serialized form and avoid copy
+ numOffsets = abr.readInt();
+ offsetsOffset = abr.getPosition();
+ int skipped = abr.skipBytes(numOffsets * 4);
+ if (skipped != numOffsets * 4) {
+ throw new IOException("Skipped less than expected " + skipped + " " + (numOffsets * 4));
+ }
+ indexSize = in.readInt();
+ indexOffset = abr.getPosition();
+ skipped = abr.skipBytes(indexSize);
+ if (skipped != indexSize) {
+ throw new IOException("Skipped less than expected " + skipped + " " + indexSize);
+ }
+ } else {
+ numOffsets = in.readInt();
+ offsetsArray = new int[numOffsets];
- int indexSize = in.readInt();
- byte[] serializedIndex = new byte[indexSize];
- in.readFully(serializedIndex);
+ for (int i = 0; i < numOffsets; i++)
+ offsetsArray[i] = in.readInt();
- index = new SerializedIndex(offsets, serializedIndex, true);
- keyIndex = new KeyIndex(offsets, serializedIndex);
+ indexSize = in.readInt();
+ data = new byte[indexSize];
+ in.readFully(data);
+ newFormat = true;
+ }
} else if (version == RFile.RINDEX_VER_3) {
level = 0;
offset = 0;
@@ -307,9 +368,9 @@ public class MultiLevelIndex {
oia[i] = oal.get(i);
}
- byte[] serializedIndex = baos.toByteArray();
- index = new SerializedIndex(oia, serializedIndex, false);
- keyIndex = new KeyIndex(oia, serializedIndex);
+ data = baos.toByteArray();
+ offsetsArray = oia;
+ newFormat = false;
} else if (version == RFile.RINDEX_VER_4) {
level = 0;
offset = 0;
@@ -325,8 +386,9 @@ public class MultiLevelIndex {
byte[] indexData = new byte[size];
in.readFully(indexData);
- index = new SerializedIndex(offsets, indexData, false);
- keyIndex = new KeyIndex(offsets, indexData);
+ data = indexData;
+ offsetsArray = offsets;
+ newFormat = false;
} else {
throw new RuntimeException("Unexpected version " + version);
}
@@ -334,11 +396,23 @@ public class MultiLevelIndex {
}
List<IndexEntry> getIndex() {
- return index;
+ // create SerializedIndex on demand as each has an internal input stream over byte array... keeping a SerializedIndex ref for the object could lead to
+ // problems with deep copies.
+ if (offsetsArray == null) {
+ return new SerializedIndex(data, offsetsOffset, numOffsets, indexOffset, indexSize);
+ } else {
+ return new SerializedIndex(offsetsArray, data, newFormat);
+ }
}
public List<Key> getKeyIndex() {
- return keyIndex;
+ // create KeyIndex on demand as each has an internal input stream over byte array... keeping a KeyIndex ref for the object could lead to problems with
+ // deep copies.
+ if (offsetsArray == null) {
+ return new KeyIndex(data, offsetsOffset, numOffsets, indexOffset, indexSize);
+ } else {
+ return new KeyIndex(offsetsArray, data);
+ }
}
int getLevel() {
@@ -761,14 +835,15 @@ public class MultiLevelIndex {
if (count == null)
count = 0l;
- size += ib.index.sizeInBytes();
+ List<IndexEntry> index = ib.getIndex();
+ size += ((SerializedIndex) index).sizeInBytes();
count++;
sizesByLevel.put(ib.getLevel(), size);
countsByLevel.put(ib.getLevel(), count);
if (ib.getLevel() > 0) {
- for (IndexEntry ie : ib.index) {
+ for (IndexEntry ie : index) {
IndexBlock cib = getIndexBlock(ie);
getIndexInfo(cib, sizesByLevel, countsByLevel);
}