You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/04/18 12:21:40 UTC
lucene-solr:master: SOLR-8349: Allow sharing of large in memory data
structures across cores
Repository: lucene-solr
Updated Branches:
refs/heads/master 4751b83c9 -> 9a1880aee
SOLR-8349: Allow sharing of large in memory data structures across cores
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9a1880ae
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9a1880ae
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9a1880ae
Branch: refs/heads/master
Commit: 9a1880aee821d4e6e96a8ff2fb15062b1e4c9eb1
Parents: 4751b83
Author: Noble Paul <no...@apache.org>
Authored: Mon Apr 18 15:51:19 2016 +0530
Committer: Noble Paul <no...@apache.org>
Committed: Mon Apr 18 15:51:19 2016 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../org/apache/solr/core/BlobRepository.java | 220 +++++++++++--------
.../java/org/apache/solr/core/PluginBag.java | 35 ++-
.../src/java/org/apache/solr/core/SolrCore.java | 32 +++
.../solr/configsets/resource-sharing/schema.xml | 25 +++
.../configsets/resource-sharing/solrconfig.xml | 51 +++++
.../solr/core/BlobRepositoryCloudTest.java | 138 ++++++++++++
.../solr/core/BlobRepositoryMockingTest.java | 165 ++++++++++++++
.../apache/solr/core/TestDynamicLoading.java | 4 +-
.../component/ResourceSharingTestComponent.java | 149 +++++++++++++
10 files changed, 724 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 5d81091..6fc116b 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -96,6 +96,9 @@ New Features
* SOLR-8962: Add sort Streaming Expression. The expression takes a single input stream and a
comparator and outputs tuples in stable order of the comparator. (Dennis Gove)
+* SOLR-8349: Allow sharing of large in memory data structures across cores (Gus Heck, noble)
+
+
Bug Fixes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/java/org/apache/solr/core/BlobRepository.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/BlobRepository.java b/solr/core/src/java/org/apache/solr/core/BlobRepository.java
index 09461f0..0f3d1c3 100644
--- a/solr/core/src/java/org/apache/solr/core/BlobRepository.java
+++ b/solr/core/src/java/org/apache/solr/core/BlobRepository.java
@@ -31,9 +31,9 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
+import java.util.regex.Pattern;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
@@ -46,18 +46,21 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.handler.admin.CollectionsHandler;
-import org.apache.solr.util.CryptoKeys;
import org.apache.solr.util.SimplePostTool;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.SolrException.ErrorCode.SERVICE_UNAVAILABLE;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+
/**
* The purpose of this class is to store the Jars loaded in memory and to keep only one copy of the Jar in a single node.
*/
public class BlobRepository {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final Random RANDOM;
+ static final Pattern BLOB_KEY_PATTERN_CHECKER = Pattern.compile(".*/\\d+");
static {
// We try to make things reproducible in the context of our tests by initializing the random instance
@@ -71,81 +74,113 @@ public class BlobRepository {
}
private final CoreContainer coreContainer;
- private Map<String, BlobContent> blobs = new ConcurrentHashMap<>();
+ private Map<String, BlobContent> blobs = createMap();
- public BlobRepository(CoreContainer coreContainer) {
- this.coreContainer = coreContainer;
+ // for unit tests to override
+ ConcurrentHashMap<String, BlobContent> createMap() {
+ return new ConcurrentHashMap<>();
}
- public static ByteBuffer getFileContent(BlobContent blobContent, String entryName) throws IOException {
- ByteArrayInputStream zipContents = new ByteArrayInputStream(blobContent.buffer.array(), blobContent.buffer.arrayOffset(), blobContent.buffer.limit());
- ZipInputStream zis = new ZipInputStream(zipContents);
- try {
- ZipEntry entry;
- while ((entry = zis.getNextEntry()) != null) {
- if (entryName == null || entryName.equals(entry.getName())) {
- SimplePostTool.BAOS out = new SimplePostTool.BAOS();
- byte[] buffer = new byte[2048];
- int size;
- while ((size = zis.read(buffer, 0, buffer.length)) != -1) {
- out.write(buffer, 0, size);
- }
- out.close();
- return out.getByteBuffer();
- }
- }
- } finally {
- zis.closeEntry();
- }
- return null;
+ public BlobRepository(CoreContainer coreContainer) {
+ this.coreContainer = coreContainer;
}
+ // I wanted to {@link SolrCore#loadDecodeAndCacheBlob(String, Decoder)} below but precommit complains
/**
- * Returns the contents of a jar and increments a reference count. Please return the same object to decrease the refcount
+ * Returns the contents of a blob containing a ByteBuffer and increments a reference count. Please return the
+ * same object to decrease the refcount. This is normally used for storing jar files, and binary raw data.
+ * If you are caching Java Objects you want to use {@code SolrCore#loadDecodeAndCacheBlob(String, Decoder)}
*
* @param key it is a combination of blobname and version like blobName/version
- * @return The reference of a jar
+ * @return The reference of a blob
*/
- public BlobContentRef getBlobIncRef(String key) {
- BlobContent aBlob = blobs.get(key);
- if (aBlob == null) {
- if (this.coreContainer.isZooKeeperAware()) {
- Replica replica = getSystemCollReplica();
- String url = replica.getStr(BASE_URL_PROP) + "/.system/blob/" + key + "?wt=filestream";
-
- HttpClient httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
- HttpGet httpGet = new HttpGet(url);
- ByteBuffer b;
- try {
- HttpResponse entity = httpClient.execute(httpGet, HttpClientUtil.createNewHttpClientRequestContext());
- int statusCode = entity.getStatusLine().getStatusCode();
- if (statusCode != 200) {
- throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "no such blob or version available: " + key);
- }
- b = SimplePostTool.inputStreamToByteArray(entity.getEntity().getContent());
- } catch (Exception e) {
- if (e instanceof SolrException) {
- throw (SolrException) e;
- } else {
- throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "could not load : " + key, e);
+ public BlobContentRef<ByteBuffer> getBlobIncRef(String key) {
+ return getBlobIncRef(key, () -> addBlob(key));
+ }
+
+ /**
+ * Internal method that returns the contents of a blob and increments a reference count. Please return the same
+ * object to decrease the refcount. Only the decoded content will be cached when this method is used. Component
+ * authors attempting to share objects across cores should use
+ * {@code SolrCore#loadDecodeAndCacheBlob(String, Decoder)} which ensures that a proper close hook is also created.
+ *
+ * @param key it is a combination of blob name and version like blobName/version
+ * @param decoder a decoder that knows how to interpret the bytes from the blob
+ * @return The reference of a blob
+ */
+ BlobContentRef<Object> getBlobIncRef(String key, Decoder<Object> decoder) {
+ return getBlobIncRef(key.concat(decoder.getName()), () -> addBlob(key,decoder));
+ }
+
+ // do the actual work returning the appropriate type...
+ private <T> BlobContentRef<T> getBlobIncRef(String key, Callable<BlobContent<T>> blobCreator) {
+ BlobContent<T> aBlob;
+ if (this.coreContainer.isZooKeeperAware()) {
+ synchronized (blobs) {
+ aBlob = blobs.get(key);
+ if (aBlob == null) {
+ try {
+ aBlob = blobCreator.call();
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Blob loading failed: "+e.getMessage(), e);
}
- } finally {
- httpGet.releaseConnection();
}
- blobs.put(key, aBlob = new BlobContent(key, b));
- } else {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Jar loading is not supported in non-cloud mode");
- // todo
}
-
+ } else {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Blob loading is not supported in non-cloud mode");
+ // todo
}
-
- BlobContentRef ref = new BlobContentRef(aBlob);
+ BlobContentRef<T> ref = new BlobContentRef<>(aBlob);
synchronized (aBlob.references) {
aBlob.references.add(ref);
}
return ref;
+ }
+ // For use cases sharing raw bytes
+ private BlobContent<ByteBuffer> addBlob(String key) {
+ ByteBuffer b = fetchBlob(key);
+ BlobContent<ByteBuffer> aBlob = new BlobContent<>(key, b);
+ blobs.put(key, aBlob);
+ return aBlob;
+ }
+
+ // for use cases sharing java objects
+ private BlobContent<Object> addBlob(String key, Decoder<Object> decoder) {
+ ByteBuffer b = fetchBlob(key);
+ String keyPlusName = key + decoder.getName();
+ BlobContent<Object> aBlob = new BlobContent<>(keyPlusName, b, decoder);
+ blobs.put(keyPlusName, aBlob);
+ return aBlob;
+ }
+
+ /**
+ * Package local for unit tests only please do not use elsewhere
+ */
+ ByteBuffer fetchBlob(String key) {
+ Replica replica = getSystemCollReplica();
+ String url = replica.getStr(BASE_URL_PROP) + "/.system/blob/" + key + "?wt=filestream";
+
+ HttpClient httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
+ HttpGet httpGet = new HttpGet(url);
+ ByteBuffer b;
+ try {
+ HttpResponse entity = httpClient.execute(httpGet);
+ int statusCode = entity.getStatusLine().getStatusCode();
+ if (statusCode != 200) {
+ throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "no such blob or version available: " + key);
+ }
+ b = SimplePostTool.inputStreamToByteArray(entity.getEntity().getContent());
+ } catch (Exception e) {
+ if (e instanceof SolrException) {
+ throw (SolrException) e;
+ } else {
+ throw new SolrException(SolrException.ErrorCode.NOT_FOUND, "could not load : " + key, e);
+ }
+ } finally {
+ httpGet.releaseConnection();
+ }
+ return b;
}
private Replica getSystemCollReplica() {
@@ -193,61 +228,60 @@ public class BlobRepository {
blobs.remove(ref.blob.key);
}
}
-
}
- public static class BlobContent {
- private final String key;
- private Map<String, Object> decodedObjects = null;
- // TODO move this off-heap
- private final ByteBuffer buffer;
+ public static class BlobContent<T> {
+ public final String key;
+ private final T content; // holds byte buffer or cached object, holding both is a waste of memory
// ref counting mechanism
private final Set<BlobContentRef> references = new HashSet<>();
+ public BlobContent(String key, ByteBuffer buffer, Decoder<T> decoder) {
+ this.key = key;
+ this.content = decoder.decode(new ByteBufferInputStream(buffer));
+ }
+ @SuppressWarnings("unchecked")
public BlobContent(String key, ByteBuffer buffer) {
this.key = key;
- this.buffer = buffer;
+ this.content = (T) buffer;
}
/**
- * This method decodes the byte[] to a custom Object
- *
- * @param key The key is used to store the decoded Object. it is possible to have multiple
- * decoders for the same blob (may be unusual).
- * @param decoder A decoder instance
- * @return the decoded Object . If it was already decoded, then return from the cache
+ * Get the cached object.
+ *
+ * @return the object representing the content that is cached.
*/
- public <T> T decodeAndCache(String key, Decoder<T> decoder) {
- if (decodedObjects == null) {
- synchronized (this) {
- if (decodedObjects == null) decodedObjects = new ConcurrentHashMap<>();
- }
- }
-
- Object t = decodedObjects.get(key);
- if (t != null) return (T) t;
- t = decoder.decode(new ByteBufferInputStream(buffer));
- decodedObjects.put(key, t);
- return (T) t;
-
- }
-
- public String checkSignature(String base64Sig, CryptoKeys keys) {
- return keys.verify(base64Sig, buffer);
+ public T get() {
+ return this.content;
}
}
public interface Decoder<T> {
+ /**
+ * A name by which to distinguish this decoding. This only needs to be implemented if you want to support
+ * decoding the same blob content with more than one decoder.
+ *
+ * @return The name of the decoding, defaults to empty string.
+ */
+ default String getName() { return ""; }
+
+ /**
+ * A routine that knows how to convert the stream of bytes from the blob into a Java object.
+ *
+ * @param inputStream the bytes from a blob
+ * @return A Java object of the specified type.
+ */
T decode(InputStream inputStream);
}
- public static class BlobContentRef {
- public final BlobContent blob;
- private BlobContentRef(BlobContent blob) {
+ public static class BlobContentRef<T> {
+ public final BlobContent<T> blob;
+
+ private BlobContentRef(BlobContent<T> blob) {
this.blob = blob;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/java/org/apache/solr/core/PluginBag.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/PluginBag.java b/solr/core/src/java/org/apache/solr/core/PluginBag.java
index 0defdad..412bd93 100644
--- a/solr/core/src/java/org/apache/solr/core/PluginBag.java
+++ b/solr/core/src/java/org/apache/solr/core/PluginBag.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.core;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
@@ -28,6 +29,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
import org.apache.lucene.analysis.util.ResourceLoader;
import org.apache.lucene.analysis.util.ResourceLoaderAware;
@@ -38,6 +41,7 @@ import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.component.SearchComponent;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.util.CryptoKeys;
+import org.apache.solr.util.SimplePostTool;
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.apache.solr.util.plugin.SolrCoreAware;
@@ -386,7 +390,7 @@ public class PluginBag<T> implements AutoCloseable {
*/
public static class RuntimeLib implements PluginInfoInitialized, AutoCloseable {
private String name, version, sig;
- private BlobRepository.BlobContentRef jarContent;
+ private BlobRepository.BlobContentRef<ByteBuffer> jarContent;
private final CoreContainer coreContainer;
private boolean verified = false;
@@ -430,10 +434,35 @@ public class PluginBag<T> implements AutoCloseable {
public ByteBuffer getFileContent(String entryName) throws IOException {
if (jarContent == null)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "jar not available: " + name + "/" + version);
- return BlobRepository.getFileContent(jarContent.blob, entryName);
+ return getFileContent(jarContent.blob, entryName);
}
+ public ByteBuffer getFileContent(BlobRepository.BlobContent<ByteBuffer> blobContent, String entryName) throws IOException {
+ ByteBuffer buff = blobContent.get();
+ ByteArrayInputStream zipContents = new ByteArrayInputStream(buff.array(), buff.arrayOffset(), buff.limit());
+ ZipInputStream zis = new ZipInputStream(zipContents);
+ try {
+ ZipEntry entry;
+ while ((entry = zis.getNextEntry()) != null) {
+ if (entryName == null || entryName.equals(entry.getName())) {
+ SimplePostTool.BAOS out = new SimplePostTool.BAOS();
+ byte[] buffer = new byte[2048];
+ int size;
+ while ((size = zis.read(buffer, 0, buffer.length)) != -1) {
+ out.write(buffer, 0, size);
+ }
+ out.close();
+ return out.getByteBuffer();
+ }
+ }
+ } finally {
+ zis.closeEntry();
+ }
+ return null;
+ }
+
+
@Override
public void close() throws Exception {
if (jarContent != null) coreContainer.getBlobRepository().decrementBlobRefCount(jarContent);
@@ -472,7 +501,7 @@ public class PluginBag<T> implements AutoCloseable {
}
try {
- String matchedKey = jarContent.blob.checkSignature(sig, new CryptoKeys(keys));
+ String matchedKey = new CryptoKeys(keys).verify(sig, jarContent.blob.get());
if (matchedKey == null)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "No key matched signature for jar : " + name + " version: " + version);
log.info("Jar {} signed with {} successfully verified", name, matchedKey);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/java/org/apache/solr/core/SolrCore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index faac0a2..bb0cd05 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -2569,6 +2569,38 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
}
return implicits;
}
+
+ /**
+ * Convenience method to load a blob. This method minimizes the degree to which component and other code needs
+ * to depend on the structure of solr's object graph and ensures that a proper close hook is registered. This method
+ * should normally be called in {@link SolrCoreAware#inform(SolrCore)}, and should never be called during request
+ * processing. The Decoder will only run on the first invocations, subsequent invocations will return the
+ * cached object.
+ *
+ * @param key A key in the format of name/version for a blob stored in the .system blob store via the Blob Store API
+ * @param decoder a decoder with which to convert the blob into a Java Object representation (first time only)
+ * @return a reference to the blob that has already cached the decoded version.
+ */
+ public BlobRepository.BlobContentRef loadDecodeAndCacheBlob(String key, BlobRepository.Decoder<Object> decoder) {
+ // make sure component authors don't give us oddball keys with no version...
+ if (!BlobRepository.BLOB_KEY_PATTERN_CHECKER.matcher(key).matches()) {
+ throw new IllegalArgumentException("invalid key format, must end in /N where N is the version number");
+ }
+ CoreContainer coreContainer = getCoreDescriptor().getCoreContainer();
+ // define the blob
+ BlobRepository.BlobContentRef blobRef = coreContainer.getBlobRepository().getBlobIncRef(key, decoder);
+ addCloseHook(new CloseHook() {
+ @Override
+ public void preClose(SolrCore core) {
+ }
+
+ @Override
+ public void postClose(SolrCore core) {
+ core.getCoreDescriptor().getCoreContainer().getBlobRepository().decrementBlobRefCount(blobRef);
+ }
+ });
+ return blobRef;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test-files/solr/configsets/resource-sharing/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/resource-sharing/schema.xml b/solr/core/src/test-files/solr/configsets/resource-sharing/schema.xml
new file mode 100644
index 0000000..1288cf4
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/resource-sharing/schema.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+ <types>
+ <fieldType name="string" class="solr.StrField"/>
+ </types>
+ <fields>
+ <dynamicField name="*" type="string" indexed="true" stored="true" />
+ </fields>
+</schema>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test-files/solr/configsets/resource-sharing/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/resource-sharing/solrconfig.xml b/solr/core/src/test-files/solr/configsets/resource-sharing/solrconfig.xml
new file mode 100644
index 0000000..1dd92fe
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/resource-sharing/solrconfig.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <directoryFactory name="DirectoryFactory"
+ class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <commitWithin>
+ <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+ </commitWithin>
+
+ </updateHandler>
+ <searchComponent name="testComponent" class="org.apache.solr.handler.component.ResourceSharingTestComponent" />
+
+ <requestHandler name="/select" class="solr.SearchHandler">
+ <lst name="defaults">
+ <str name="echoParams">explicit</str>
+ <str name="indent">true</str>
+ <str name="df">text</str>
+ </lst>
+ <arr name="first-components">
+ <str>testComponent</str>
+ </arr>
+ </requestHandler>
+</config>
+
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java b/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java
new file mode 100644
index 0000000..3e51b36
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java
@@ -0,0 +1,138 @@
+package org.apache.solr.core;
+
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Set;
+
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cloud.ZkTestServer;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class BlobRepositoryCloudTest extends SolrCloudTestCase {
+
+ public static final Path TEST_PATH = getFile("solr/configsets").toPath();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(1) // only sharing *within* a node
+ .addConfig("configname", TEST_PATH.resolve("resource-sharing"))
+ .configure();
+// Thread.sleep(2000);
+ HashMap<String, String> params = new HashMap<>();
+ cluster.createCollection(".system", 1, 1, null, params);
+// Thread.sleep(2000);
+ // test component will fail if it cant' find a blob with this data by this name
+ postBlob("testResource", "foo,bar\nbaz,bam");
+// Thread.sleep(2000);
+ // if these don't load we probably failed to post the blob above
+ cluster.createCollection("col1", 1, 1, "configname", params);
+ cluster.createCollection("col2", 1, 1, "configname", params);
+// Thread.sleep(2000);
+ SolrInputDocument document = new SolrInputDocument();
+ document.addField("id", "1");
+ document.addField("text", "col1");
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ solrClient.add("col1", document);
+ solrClient.commit("col1");
+ document = new SolrInputDocument();
+ document.addField("id", "1");
+ document.addField("text", "col2");
+ solrClient.add("col2", document);
+ solrClient.commit("col2");
+ Thread.sleep(2000);
+
+ }
+
+ @Test
+ public void test() throws Exception {
+ // This test relies on the installation of ResourceSharingTestComponent which has 2 useful properties:
+ // 1. it will fail to initialize if it doesn't find a 2 line CSV like foo,bar\nbaz,bam thus validating
+ // that we are properly pulling data from the blob store
+ // 2. It replaces any q for a query request to /select with "text:<name>" where <name> is the name
+ // of the last collection to run a query. It does this by caching a shared resource of type
+ // ResourceSharingTestComponent.TestObject, and the following sequence is proof that either
+ // collection can tell if it was (or was not) the last collection to issue a query by
+ // consulting the shared object
+ assertLastQueryNotToCollection("col1");
+ assertLastQueryNotToCollection("col2");
+ assertLastQueryNotToCollection("col1");
+ assertLastQueryToCollection("col1");
+ assertLastQueryNotToCollection("col2");
+ assertLastQueryToCollection("col2");
+ }
+
+ // TODO: move this up to parent class? Probably accepting entity, or with alternative signatures
+ private static void postBlob(String name, String string) throws IOException {
+ HttpPost post = new HttpPost(findLiveNodeURI() + "/.system/blob/" + name);
+ StringEntity csv = new StringEntity(string, ContentType.create("application/octet-stream"));
+ post.setEntity(csv);
+ try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+ httpclient.execute(post);
+ }
+ }
+
+ // TODO: move this up to parent class?
+ private static String findLiveNodeURI() {
+ ZkTestServer zkServer = cluster.getZkServer();
+ ZKDatabase zkDatabase = zkServer.getZKDatabase();
+ DataTree dataTree = zkDatabase.getDataTree();
+ DataNode node = dataTree.getNode("/solr/live_nodes");
+ Set<String> children = node.getChildren();
+ String liveNode = children.iterator().next();
+ String[] split = liveNode.split("_");
+ String host = split[0];
+ String name = split[1];
+ return "http://" + host + "/" + name;
+ }
+
+ private void assertLastQueryToCollection(String collection) throws SolrServerException, IOException {
+ assertEquals(1, getSolrDocuments(collection).size());
+ }
+
+ private void assertLastQueryNotToCollection(String collection) throws SolrServerException, IOException {
+ assertEquals(0, getSolrDocuments(collection).size());
+ }
+
+ private SolrDocumentList getSolrDocuments(String collection) throws SolrServerException, IOException {
+ SolrQuery query = new SolrQuery("*:*");
+ CloudSolrClient client = cluster.getSolrClient();
+ QueryResponse resp1 = client.query(collection, query);
+ return resp1.getResults();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test/org/apache/solr/core/BlobRepositoryMockingTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/BlobRepositoryMockingTest.java b/solr/core/src/test/org/apache/solr/core/BlobRepositoryMockingTest.java
new file mode 100644
index 0000000..e82915f
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/core/BlobRepositoryMockingTest.java
@@ -0,0 +1,165 @@
+package org.apache.solr.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.solr.common.SolrException;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class BlobRepositoryMockingTest {
+
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+ private static final String[][] PARSED = new String[][]{{"foo", "bar", "baz"}, {"bang", "boom", "bash"}};
+ private static final String BLOBSTR = "foo,bar,baz\nbang,boom,bash";
+ private CoreContainer mockContainer = EasyMock.createMock(CoreContainer.class);
+ @SuppressWarnings("unchecked")
+ private ConcurrentHashMap<String, BlobRepository.BlobContent> mapMock = EasyMock.createMock(ConcurrentHashMap.class);
+ @SuppressWarnings("unchecked")
+ private BlobRepository.Decoder<Object> decoderMock = EasyMock.createMock(BlobRepository.Decoder.class);;
+ @SuppressWarnings("unchecked")
+ private BlobRepository.BlobContent<Object> blobContentMock = EasyMock.createMock(BlobRepository.BlobContent.class);
+
+ private Object[] mocks = new Object[] {
+ mockContainer,
+ decoderMock,
+ blobContentMock,
+ mapMock
+ };
+
+ BlobRepository repository;
+ ByteBuffer blobData = ByteBuffer.wrap(BLOBSTR.getBytes(UTF8));
+ boolean blobFetched = false;
+ String blobKey = "";
+
+
+ @Before
+ public void setUp() throws IllegalAccessException, NoSuchFieldException {
+ blobFetched = false;
+ blobKey = "";
+ EasyMock.reset(mocks);
+ repository = new BlobRepository(mockContainer) {
+ @Override
+ ByteBuffer fetchBlob(String key) {
+ blobKey = key;
+ blobFetched = true;
+ return blobData;
+ }
+
+ @Override
+ ConcurrentHashMap<String, BlobContent> createMap() {
+ return mapMock;
+ }
+
+ };
+ }
+
+ @After
+ public void tearDown() {
+ EasyMock.verify(mocks);
+ }
+
+ @Test (expected = SolrException.class)
+ public void testCloudOnly() {
+ expect(mockContainer.isZooKeeperAware()).andReturn(false);
+ EasyMock.replay(mocks);
+ BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetBlobIncrRefString() {
+ expect(mockContainer.isZooKeeperAware()).andReturn(true);
+ expect(mapMock.get("foo!")).andReturn(null);
+ expect(mapMock.put(eq("foo!"), anyObject(BlobRepository.BlobContent.class))).andReturn(null);
+ EasyMock.replay(mocks);
+ BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
+ assertTrue("foo!".equals(blobKey));
+ assertTrue(blobFetched);
+ assertNotNull(ref.blob);
+ assertEquals(blobData, ref.blob.get());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCachedAlready() {
+ expect(mockContainer.isZooKeeperAware()).andReturn(true);
+ expect(mapMock.get("foo!")).andReturn(new BlobRepository.BlobContent<BlobRepository>("foo!", blobData));
+ EasyMock.replay(mocks);
+ BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!");
+ assertEquals("",blobKey);
+ assertFalse(blobFetched);
+ assertNotNull(ref.blob);
+ assertEquals(blobData, ref.blob.get());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetBlobIncrRefStringDecoder() {
+ expect(mockContainer.isZooKeeperAware()).andReturn(true);
+ expect(mapMock.get("foo!mocked")).andReturn(null);
+ expect(mapMock.put(eq("foo!mocked"), anyObject(BlobRepository.BlobContent.class))).andReturn(null);
+
+ EasyMock.replay(mocks);
+ BlobRepository.BlobContentRef ref = repository.getBlobIncRef("foo!", new BlobRepository.Decoder<Object>() {
+ @Override
+ public Object decode(InputStream inputStream) {
+ StringWriter writer = new StringWriter();
+ try {
+ IOUtils.copy(inputStream, writer, UTF8);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ assertEquals(BLOBSTR, writer.toString());
+ return PARSED;
+ }
+
+ @Override
+ public String getName() {
+ return "mocked";
+ }
+ });
+ assertEquals("foo!",blobKey);
+ assertTrue(blobFetched);
+ assertNotNull(ref.blob);
+ assertEquals(PARSED, ref.blob.get());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test/org/apache/solr/core/TestDynamicLoading.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/TestDynamicLoading.java b/solr/core/src/test/org/apache/solr/core/TestDynamicLoading.java
index 6570e4a..f7832ef 100644
--- a/solr/core/src/test/org/apache/solr/core/TestDynamicLoading.java
+++ b/solr/core/src/test/org/apache/solr/core/TestDynamicLoading.java
@@ -104,7 +104,7 @@ public class TestDynamicLoading extends AbstractFullDistribZkTestBase {
Map map = TestSolrConfigHandler.getRespMap("/test1?wt=json", client);
assertNotNull(TestBlobHandler.getAsString(map), map = (Map) map.get("error"));
- assertEquals(TestBlobHandler.getAsString(map), ".system collection not available", map.get("msg"));
+ assertTrue(TestBlobHandler.getAsString(map), map.get("msg").toString().contains(".system collection not available"));
TestBlobHandler.createSystemCollection(getHttpSolrClient(baseURL, randomClient.getHttpClient()));
@@ -114,7 +114,7 @@ public class TestDynamicLoading extends AbstractFullDistribZkTestBase {
assertNotNull(map = (Map) map.get("error"));
- assertEquals("full output " + TestBlobHandler.getAsString(map), "no such blob or version available: colltest/1" , map.get("msg"));
+ assertTrue("full output " + TestBlobHandler.getAsString(map), map.get("msg").toString().contains("no such blob or version available: colltest/1" ));
payload = " {\n" +
" 'set' : {'watched': {" +
" 'x':'X val',\n" +
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9a1880ae/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java b/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java
new file mode 100644
index 0000000..8223fe5
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java
@@ -0,0 +1,149 @@
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.core.BlobRepository;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.util.plugin.SolrCoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class ResourceSharingTestComponent extends SearchComponent implements SolrCoreAware {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private SolrCore core;
+ private volatile BlobRepository.BlobContent<TestObject> blob;
+
+ @SuppressWarnings("SynchronizeOnNonFinalField")
+ @Override
+ public void prepare(ResponseBuilder rb) throws IOException {
+ SolrParams params = rb.req.getParams();
+ ModifiableSolrParams mParams = new ModifiableSolrParams(params);
+ String q = "text:" + getTestObj().getLastCollection();
+ mParams.set("q", q); // search for the last collection name.
+ // This should cause the param to show up in the response...
+ rb.req.setParams(mParams);
+ getTestObj().setLastCollection(core.getCoreDescriptor().getCollectionName());
+ }
+
+ @Override
+ public void process(ResponseBuilder rb) throws IOException {}
+
+ @Override
+ public String getDescription() {
+ return "ResourceSharingTestComponent";
+ }
+
+ @Override
+ public String getSource() {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ TestObject getTestObj() {
+ return this.blob.get();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void inform(SolrCore core) {
+ log.info("Informing test component...");
+ this.core = core;
+ this.blob = core.loadDecodeAndCacheBlob(getKey(), new DumbCsvDecoder()).blob;
+ log.info("Test component informed!");
+ }
+
+ private String getKey() {
+ return getResourceName() + "/" + getResourceVersion();
+ }
+
+ public String getResourceName() {
+ return "testResource";
+ }
+
+ public String getResourceVersion() {
+ return "1";
+ }
+
+ class DumbCsvDecoder implements BlobRepository.Decoder<Object> {
+ private final Map<String, String> dict = new HashMap<>();
+
+ public DumbCsvDecoder() {}
+
+ void processSimpleCsvRow(String string) {
+ String[] row = string.split(","); // dumbest csv parser ever... :)
+ getDict().put(row[0], row[1]);
+ }
+
+ public Map<String, String> getDict() {
+ return dict;
+ }
+
+ @Override
+ public TestObject decode(InputStream inputStream) {
+ // loading a tiny csv like:
+ //
+ // foo,bar
+ // baz,bam
+
+ try (Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream, Charset.forName("UTF-8"))).lines()) {
+ lines.forEach(this::processSimpleCsvRow);
+ } catch (Exception e) {
+ log.error("failed to read dictionary {}", getResourceName() );
+ throw new RuntimeException("Cannot load dictionary " , e);
+ }
+
+ assertEquals("bar", dict.get("foo"));
+ assertEquals("bam", dict.get("baz"));
+ log.info("Loaded {} using {}", getDict().size(), this.getClass().getClassLoader());
+
+ // if we get here we have seen the data from the blob and all we need is to test that two collections
+ // are able to see the same object..
+ return new TestObject();
+ }
+ }
+
+
+ public static class TestObject {
+ public static final String NEVER_UPDATED = "never updated";
+ private volatile String lastCollection = NEVER_UPDATED;
+
+ public String getLastCollection() {
+ return this.lastCollection;
+ }
+
+ public void setLastCollection(String lastCollection) {
+ this.lastCollection = lastCollection;
+ }
+ }
+
+}