You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2014/01/27 18:03:10 UTC
svn commit: r1561751 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/core/
core/src/java/org/apache/solr/store/blockcache/
Author: markrmiller
Date: Mon Jan 27 17:03:09 2014
New Revision: 1561751
URL: http://svn.apache.org/r1561751
Log:
SOLR-5666: Using the hdfs write cache can result in appearance of corrupted index.
Added:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/Store.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Mon Jan 27 17:03:09 2014
@@ -229,6 +229,9 @@ Bug Fixes
* SOLR-5663: example-DIH uses non-existing column for mapping (case-sensitive)
(steffkes)
+* SOLR-5666: Using the hdfs write cache can result in appearance of corrupted
+ index. (Mark Miller)
+
Optimizations
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/HdfsDirectoryFactory.java Mon Jan 27 17:03:09 2014
@@ -128,12 +128,10 @@ public class HdfsDirectoryFactory extend
new Object[] {slabSize, bankCount,
((long) bankCount * (long) slabSize)});
- int _1024Size = params.getInt("solr.hdfs.blockcache.bufferstore.1024",
- 8192);
- int _8192Size = params.getInt("solr.hdfs.blockcache.bufferstore.8192",
- 8192);
+ int bufferSize = params.getInt("solr.hdfs.blockcache.bufferstore.buffersize", 128);
+ int bufferCount = params.getInt("solr.hdfs.blockcache.bufferstore.buffercount", 128 * 128);
- BufferStore.init(_1024Size, _8192Size, metrics);
+ BufferStore.initNewBuffer(bufferSize, bufferCount);
long totalMemory = (long) bankCount * (long) numberOfBlocksPerBank
* (long) blockSize;
try {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BlockDirectory.java Mon Jan 27 17:03:09 2014
@@ -138,33 +138,34 @@ public class BlockDirectory extends Dire
}
static class CachedIndexInput extends CustomBufferedIndexInput {
-
- private IndexInput _source;
- private int _blockSize;
- private long _fileLength;
- private String _cacheName;
- private Cache _cache;
+ private final Store store;
+ private IndexInput source;
+ private final int blockSize;
+ private final long fileLength;
+ private final String cacheName;
+ private final Cache cache;
public CachedIndexInput(IndexInput source, int blockSize, String name,
String cacheName, Cache cache, int bufferSize) {
super(name, bufferSize);
- _source = source;
- _blockSize = blockSize;
- _fileLength = source.length();
- _cacheName = cacheName;
- _cache = cache;
+ this.source = source;
+ this.blockSize = blockSize;
+ fileLength = source.length();
+ this.cacheName = cacheName;
+ this.cache = cache;
+ store = BufferStore.instance(blockSize);
}
@Override
public IndexInput clone() {
CachedIndexInput clone = (CachedIndexInput) super.clone();
- clone._source = (IndexInput) _source.clone();
+ clone.source = (IndexInput) source.clone();
return clone;
}
@Override
public long length() {
- return _source.length();
+ return source.length();
}
@Override
@@ -186,7 +187,7 @@ public class BlockDirectory extends Dire
// read whole block into cache and then provide needed data
long blockId = getBlock(position);
int blockOffset = (int) getPosition(position);
- int lengthToReadInBlock = Math.min(len, _blockSize - blockOffset);
+ int lengthToReadInBlock = Math.min(len, blockSize - blockOffset);
if (checkCache(blockId, blockOffset, b, off, lengthToReadInBlock)) {
return lengthToReadInBlock;
} else {
@@ -199,25 +200,25 @@ public class BlockDirectory extends Dire
private void readIntoCacheAndResult(long blockId, int blockOffset,
byte[] b, int off, int lengthToReadInBlock) throws IOException {
long position = getRealPosition(blockId, 0);
- int length = (int) Math.min(_blockSize, _fileLength - position);
- _source.seek(position);
+ int length = (int) Math.min(blockSize, fileLength - position);
+ source.seek(position);
- byte[] buf = BufferStore.takeBuffer(_blockSize);
- _source.readBytes(buf, 0, length);
+ byte[] buf = store.takeBuffer(blockSize);
+ source.readBytes(buf, 0, length);
System.arraycopy(buf, blockOffset, b, off, lengthToReadInBlock);
- _cache.update(_cacheName, blockId, 0, buf, 0, _blockSize);
- BufferStore.putBuffer(buf);
+ cache.update(cacheName, blockId, 0, buf, 0, blockSize);
+ store.putBuffer(buf);
}
private boolean checkCache(long blockId, int blockOffset, byte[] b,
int off, int lengthToReadInBlock) {
- return _cache.fetch(_cacheName, blockId, blockOffset, b, off,
+ return cache.fetch(cacheName, blockId, blockOffset, b, off,
lengthToReadInBlock);
}
@Override
protected void closeInternal() throws IOException {
- _source.close();
+ source.close();
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/BufferStore.java Mon Jan 27 17:03:09 2014
@@ -19,34 +19,50 @@ package org.apache.solr.store.blockcache
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class BufferStore {
-
- public static Logger LOG = LoggerFactory.getLogger(BufferStore.class);
-
- private static BlockingQueue<byte[]> _1024 = setupBuffers(1024, 1);
- private static BlockingQueue<byte[]> _8192 = setupBuffers(8192, 1);
- public static AtomicLong shardBuffercacheLost = new AtomicLong();
- public static AtomicLong shardBuffercacheAllocate1024 = new AtomicLong();
- public static AtomicLong shardBuffercacheAllocate8192 = new AtomicLong();
- public static AtomicLong shardBuffercacheAllocateOther = new AtomicLong();
-
- public static void init(int _1024Size, int _8192Size, Metrics metrics) {
-
- LOG.info("Initializing the 1024 buffers with [{}] buffers.", _1024Size);
- _1024 = setupBuffers(1024, _1024Size);
- LOG.info("Initializing the 8192 buffers with [{}] buffers.", _8192Size);
- _8192 = setupBuffers(8192, _8192Size);
- shardBuffercacheLost = metrics.shardBuffercacheLost;
- shardBuffercacheAllocate1024 = metrics.shardBuffercacheAllocate1024;
- shardBuffercacheAllocate8192 = metrics.shardBuffercacheAllocate8192;
- shardBuffercacheAllocateOther = metrics.shardBuffercacheAllocateOther;
+public class BufferStore implements Store {
+
+ private static final Store EMPTY = new Store() {
+
+ @Override
+ public byte[] takeBuffer(int bufferSize) {
+ return new byte[bufferSize];
+ }
+
+ @Override
+ public void putBuffer(byte[] buffer) {
+ }
+ };
+
+ private final static ConcurrentMap<Integer, BufferStore> bufferStores = new ConcurrentHashMap<Integer, BufferStore>();
+
+ private final BlockingQueue<byte[]> buffers;
+
+ private final int bufferSize;
+
+ public synchronized static void initNewBuffer(int bufferSize, long totalAmount) {
+ if (totalAmount == 0) {
+ return;
+ }
+ BufferStore bufferStore = bufferStores.get(bufferSize);
+ if (bufferStore == null) {
+ long count = totalAmount / bufferSize;
+ if (count > Integer.MAX_VALUE) {
+ count = Integer.MAX_VALUE;
+ }
+ BufferStore store = new BufferStore(bufferSize, (int) count);
+ bufferStores.put(bufferSize, store);
+ }
+ }
+
+ private BufferStore(int bufferSize, int count) {
+ this.bufferSize = bufferSize;
+ buffers = setupBuffers(bufferSize, count);
}
-
+
private static BlockingQueue<byte[]> setupBuffers(int bufferSize, int count) {
BlockingQueue<byte[]> queue = new ArrayBlockingQueue<byte[]>(count);
for (int i = 0; i < count; i++) {
@@ -54,57 +70,44 @@ public class BufferStore {
}
return queue;
}
-
- public static byte[] takeBuffer(int bufferSize) {
- switch (bufferSize) {
- case 1024:
- return newBuffer1024(_1024.poll());
- case 8192:
- return newBuffer8192(_8192.poll());
- default:
- return newBuffer(bufferSize);
+
+ public static Store instance(int bufferSize) {
+ BufferStore bufferStore = bufferStores.get(bufferSize);
+ if (bufferStore == null) {
+ return EMPTY;
}
+ return bufferStore;
}
-
- public static void putBuffer(byte[] buffer) {
+
+ @Override
+ public byte[] takeBuffer(int bufferSize) {
+ if (this.bufferSize != bufferSize) {
+ throw new RuntimeException("Buffer with length [" + bufferSize + "] does not match buffer size of ["
+ + bufferSize + "]");
+ }
+ return newBuffer(buffers.poll());
+ }
+
+ @Override
+ public void putBuffer(byte[] buffer) {
if (buffer == null) {
return;
}
- int bufferSize = buffer.length;
- switch (bufferSize) {
- case 1024:
- checkReturn(_1024.offer(buffer));
- return;
- case 8192:
- checkReturn(_8192.offer(buffer));
- return;
- }
- }
-
- private static void checkReturn(boolean offer) {
- if (!offer) {
- shardBuffercacheLost.incrementAndGet();
+ if (buffer.length != bufferSize) {
+ throw new RuntimeException("Buffer with length [" + buffer.length + "] does not match buffer size of ["
+ + bufferSize + "]");
}
+ checkReturn(buffers.offer(buffer));
}
-
- private static byte[] newBuffer1024(byte[] buf) {
- if (buf != null) {
- return buf;
- }
- shardBuffercacheAllocate1024.incrementAndGet();
- return new byte[1024];
+
+ private void checkReturn(boolean offer) {
+
}
-
- private static byte[] newBuffer8192(byte[] buf) {
+
+ private byte[] newBuffer(byte[] buf) {
if (buf != null) {
return buf;
}
- shardBuffercacheAllocate8192.incrementAndGet();
- return new byte[8192];
- }
-
- private static byte[] newBuffer(int size) {
- shardBuffercacheAllocateOther.incrementAndGet();
- return new byte[size];
+ return new byte[bufferSize];
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/CustomBufferedIndexInput.java Mon Jan 27 17:03:09 2014
@@ -35,6 +35,8 @@ public abstract class CustomBufferedInde
private int bufferLength = 0; // end of valid bytes
private int bufferPosition = 0; // next byte to read
+ private Store store;
+
@Override
public byte readByte() throws IOException {
if (bufferPosition >= bufferLength) refill();
@@ -49,6 +51,7 @@ public abstract class CustomBufferedInde
super(resourceDesc);
checkBufferSize(bufferSize);
this.bufferSize = bufferSize;
+ this.store = BufferStore.instance(bufferSize);
}
private void checkBufferSize(int bufferSize) {
@@ -179,7 +182,7 @@ public abstract class CustomBufferedInde
if (newLength <= 0) throw new EOFException("read past EOF");
if (buffer == null) {
- buffer = BufferStore.takeBuffer(bufferSize);
+ buffer = store.takeBuffer(bufferSize);
seekInternal(bufferStart);
}
readInternal(buffer, 0, newLength);
@@ -191,7 +194,7 @@ public abstract class CustomBufferedInde
@Override
public final void close() throws IOException {
closeInternal();
- BufferStore.putBuffer(buffer);
+ store.putBuffer(buffer);
buffer = null;
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java?rev=1561751&r1=1561750&r2=1561751&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/ReusedBufferedIndexOutput.java Mon Jan 27 17:03:09 2014
@@ -1,6 +1,6 @@
package org.apache.solr.store.blockcache;
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -38,6 +38,8 @@ public abstract class ReusedBufferedInde
/** total length of the file */
private long fileLength = 0;
+ private final Store store;
+
public ReusedBufferedIndexOutput() {
this(BUFFER_SIZE);
}
@@ -45,7 +47,8 @@ public abstract class ReusedBufferedInde
public ReusedBufferedIndexOutput(int bufferSize) {
checkBufferSize(bufferSize);
this.bufferSize = bufferSize;
- buffer = BufferStore.takeBuffer(this.bufferSize);
+ store = BufferStore.instance(bufferSize);
+ buffer = store.takeBuffer(this.bufferSize);
}
protected long getBufferStart() {
@@ -80,7 +83,7 @@ public abstract class ReusedBufferedInde
public void close() throws IOException {
flushBufferToCache();
closeInternal();
- BufferStore.putBuffer(buffer);
+ store.putBuffer(buffer);
buffer = null;
}
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/Store.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/Store.java?rev=1561751&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/Store.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/store/blockcache/Store.java Mon Jan 27 17:03:09 2014
@@ -0,0 +1,26 @@
+package org.apache.solr.store.blockcache;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface Store {
+
+ byte[] takeBuffer(int bufferSize);
+
+ void putBuffer(byte[] buffer);
+
+}