You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2015/08/12 07:45:13 UTC
[2/2] incubator-atlas git commit: ATLAS-91 Add solr configuration and
documentation (suma.shivaprasad via shwethags)
ATLAS-91 Add solr configuration and documentation (suma.shivaprasad via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/48343db9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/48343db9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/48343db9
Branch: refs/heads/master
Commit: 48343db999b495458409644c8b9d2fd0bd9fa99d
Parents: 147242e
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Wed Aug 12 11:15:02 2015 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Wed Aug 12 11:15:02 2015 +0530
----------------------------------------------------------------------
client/pom.xml | 6 +
docs/src/site/twiki/InstallationSteps.twiki | 40 +
pom.xml | 71 +-
release-log.txt | 1 +
repository/pom.xml | 10 +
.../titan/diskstorage/solr/Solr5Index.java | 962 +++++++++++++++++++
.../repository/graph/TitanGraphProvider.java | 36 +
src/conf/solr/currency.xml | 67 ++
src/conf/solr/lang/stopwords_en.txt | 54 ++
src/conf/solr/protwords.txt | 21 +
src/conf/solr/schema.xml | 534 ++++++++++
src/conf/solr/solrconfig.xml | 625 ++++++++++++
src/conf/solr/stopwords.txt | 14 +
src/conf/solr/synonyms.txt | 29 +
14 files changed, 2466 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index d393b3a..2e27930 100755
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -37,6 +37,12 @@
<artifactId>atlas-typesystem</artifactId>
</dependency>
+ <!-- supports simple auth handler -->
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index 0391c2d..e056d17 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -35,6 +35,15 @@ Tar is structured as follows
|- client.properties
|- atlas-env.sh
|- log4j.xml
+ |- solr
+ |- currency.xml
+ |- lang
+ |- stopwords_en.txt
+ |- protowords.txt
+ |- schema.xml
+ |- solrconfig.xml
+ |- stopwords.txt
+ |- synonyms.txt
|- docs
|- server
|- webapp
@@ -112,6 +121,37 @@ and change it to look as below
export METADATA_SERVER_OPTS="-Djava.awt.headless=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc="
</verbatim>
+* Configuring SOLR as the Indexing Backend for the Graph Repository
+
+By default, Atlas uses Titan as the graph repository and is the only graph repository implementation available currently.
+For configuring Titan to work with Solr, please follow the instructions below
+<verbatim>
+* Install solr if not already running. Versions of SOLR supported are 4.8.1 or 5.2.1.
+
+* Start solr in cloud mode.
+ SolrCloud mode uses a ZooKeeper Service as a highly available, central location for cluster management.
+ For a small cluster, running with an existing ZooKeeper quorum should be fine. For larger clusters, you would want to run separate multiple ZooKeeper quorum with atleast 3 servers.
+ Note: Atlas currently supports solr in "cloud" mode only. "http" mode is not supported. For more information, refer solr documentation - https://cwiki.apache.org/confluence/display/solr/SolrCloud
+
+* Run the following commands from SOLR_HOME directory to create collections in Solr corresponding to the indexes that Atlas uses
+ bin/solr create -c vertex_index -d ATLAS_HOME/conf/solr -shards #numShards -replicationFactor #replicationFactor
+ bin/solr create -c edge_index -d ATLAS_HOME/conf/solr -shards #numShards -replicationFactor #replicationFactor
+ bin/solr create -c fulltext_index -d ATLAS_HOME/conf/solr -shards #numShards -replicationFactor #replicationFactor
+
+ Note: If numShards and replicationFactor are not specified, they default to 1 which suffices if you are trying out solr with ATLAS on a single node instance.
+ Otherwise specify numShards according to the number of hosts that are in the Solr cluster and the maxShardsPerNode configuration.
+ The number of shards cannot exceed the total number of Solr nodes in your SolrCloud cluster
+
+* Change ATLAS configuration to point to the Solr instance setup. Please make sure the following configurations are set to the below values in ATLAS_HOME//conf/application.properties
+ atlas.graph.index.search.backend=<'solr' for solr 4.8.1>/<'solr5' for solr 5.2.1>
+ atlas.graph.index.search.solr.mode=cloud
+ atlas.graph.index.search.solr.zookeeper-url=<the ZK quorum setup for solr as comma separated value> eg: 10.1.6.4:2181,10.1.6.5:2181
+
+* Restart Atlas
+</verbatim>
+
+For more information on Titan solr configuration , please refer http://s3.thinkaurelius.com/docs/titan/0.5.4/solr.htm
+
*Starting Atlas Server*
<verbatim>
bin/atlas_start.py [-port <port>]
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index facd539..3d729b5 100755
--- a/pom.xml
+++ b/pom.xml
@@ -329,8 +329,8 @@
<titan.version>0.5.4</titan.version>
<hadoop.version>2.7.0</hadoop.version>
<hbase.version>0.98.9-hadoop2</hbase.version>
+ <solr.version>5.1.0</solr.version>
<kafka.version>0.8.2.0</kafka.version>
-
<!-- scala versions -->
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
@@ -397,8 +397,8 @@
</activation>
<properties>
<titan.storage.backend>hbase</titan.storage.backend>
- <titan.index.backend>solr</titan.index.backend>
- <solr.zk.address>localhost:9983</solr.zk.address>
+ <titan.index.backend>solr5</titan.index.backend>
+ <solr.zk.address>localhost:2181</solr.zk.address>
<titan.storage.hostname>localhost</titan.storage.hostname>
</properties>
</profile>
@@ -578,6 +578,10 @@
<groupId>org.htrace</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -903,11 +907,49 @@
<artifactId>*</artifactId>
<groupId>org.ow2.asm</groupId>
</exclusion>
+ <exclusion>
+ <artifactId>*</artifactId>
+ <groupId>org.apache.solr</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-core</artifactId>
+ <version>${solr.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>*</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>*</artifactId>
+ <groupId>org.eclipse.jetty.orbit</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>*</artifactId>
+ <groupId>org.restlet.jee</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>*</artifactId>
+ <groupId>org.ow2.asm</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>*</artifactId>
+ <groupId>org.apache.lucene</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-solrj</artifactId>
+ <version>${solr.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-lucene</artifactId>
<version>${titan.version}</version>
@@ -1084,7 +1126,19 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
- <version>4.2.5</version>
+ <version>4.4.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.4.1</version>
</dependency>
<!--Test dependencies-->
@@ -1376,6 +1430,14 @@
</descriptors>
<finalName>apache-atlas-${project.version}</finalName>
</configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
</plugin>
<plugin>
@@ -1502,6 +1564,7 @@
<exclude>**/*.iml</exclude>
<exclude>**/*.json</exclude>
<exclude>**/target/**</exclude>
+ <exclude>**/target*/**</exclude>
<exclude>**/build/**</exclude>
<exclude>**/*.patch</exclude>
<exclude>derby.log</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 5d83baa..5de705f 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -8,6 +8,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (suma.shivaprasad via shwethags)
ALL CHANGES:
+ATLAS-91 Add solr configuration and documentation (suma.shivaprasad via shwethags)
ATLAS-95 import-hive.sh reports illegal java parameters (shwethags)
ATLAS-74 Create notification framework (shwethags)
ATLAS-93 import-hive.sh reports FileNotFoundException (shwethags)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/repository/pom.xml
----------------------------------------------------------------------
diff --git a/repository/pom.xml b/repository/pom.xml
index a2f8e08..8e4d0f3 100755
--- a/repository/pom.xml
+++ b/repository/pom.xml
@@ -95,6 +95,16 @@
</dependency>
<dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-solrj</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-berkeleyje</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java
new file mode 100644
index 0000000..e484c18
--- /dev/null
+++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java
@@ -0,0 +1,962 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.thinkaurelius.titan.diskstorage.solr;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.thinkaurelius.titan.core.Order;
+import com.thinkaurelius.titan.core.TitanElement;
+import com.thinkaurelius.titan.core.attribute.Cmp;
+import com.thinkaurelius.titan.core.attribute.Geo;
+import com.thinkaurelius.titan.core.attribute.Geoshape;
+import com.thinkaurelius.titan.core.attribute.Text;
+import com.thinkaurelius.titan.core.schema.Mapping;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.BaseTransaction;
+import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
+import com.thinkaurelius.titan.diskstorage.BaseTransactionConfigurable;
+import com.thinkaurelius.titan.diskstorage.PermanentBackendException;
+import com.thinkaurelius.titan.diskstorage.TemporaryBackendException;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.indexing.IndexEntry;
+import com.thinkaurelius.titan.diskstorage.indexing.IndexFeatures;
+import com.thinkaurelius.titan.diskstorage.indexing.IndexMutation;
+import com.thinkaurelius.titan.diskstorage.indexing.IndexProvider;
+import com.thinkaurelius.titan.diskstorage.indexing.IndexQuery;
+import com.thinkaurelius.titan.diskstorage.indexing.KeyInformation;
+import com.thinkaurelius.titan.diskstorage.indexing.RawQuery;
+import com.thinkaurelius.titan.diskstorage.solr.transform.GeoToWktConverter;
+import com.thinkaurelius.titan.diskstorage.util.DefaultTransaction;
+import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
+import com.thinkaurelius.titan.graphdb.database.serialize.AttributeUtil;
+import com.thinkaurelius.titan.graphdb.database.serialize.attribute.AbstractDecimal;
+import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
+import com.thinkaurelius.titan.graphdb.query.condition.And;
+import com.thinkaurelius.titan.graphdb.query.condition.Condition;
+import com.thinkaurelius.titan.graphdb.query.condition.Not;
+import com.thinkaurelius.titan.graphdb.query.condition.Or;
+import com.thinkaurelius.titan.graphdb.query.condition.PredicateCondition;
+import com.thinkaurelius.titan.graphdb.types.ParameterType;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.UUID;
+
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE;
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NS;
+
+/**
+ * NOTE: Copied from titan for supporting sol5. Do not change
+ */
+@PreInitializeConfigOptions
+public class Solr5Index implements IndexProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(Solr5Index.class);
+
+
+ private static final String DEFAULT_ID_FIELD = "id";
+
+ private enum Mode {
+ HTTP, CLOUD;
+
+ public static Mode parse(String mode) {
+ for (Mode m : Mode.values()) {
+ if (m.toString().equalsIgnoreCase(mode)) return m;
+ }
+ throw new IllegalArgumentException("Unrecognized mode: "+mode);
+ }
+ }
+
+ public static final ConfigNamespace SOLR_NS =
+ new ConfigNamespace(INDEX_NS, "solr", "Solr index configuration");
+
+ public static final ConfigOption<String> SOLR_MODE = new ConfigOption<String>(SOLR_NS,"mode",
+ "The operation mode for Solr which is either via HTTP (`http`) or using SolrCloud (`cloud`)",
+ ConfigOption.Type.GLOBAL_OFFLINE, "cloud");
+
+ public static final ConfigOption<Boolean> DYNAMIC_FIELDS = new ConfigOption<Boolean>(SOLR_NS,"dyn-fields",
+ "Whether to use dynamic fields (which appends the data type to the field name). If dynamic fields is disabled" +
+ "the user must map field names and define them explicitly in the schema.",
+ ConfigOption.Type.GLOBAL_OFFLINE, true);
+
+ public static final ConfigOption<String[]> KEY_FIELD_NAMES = new ConfigOption<String[]>(SOLR_NS,"key-field-names",
+ "Field name that uniquely identifies each document in Solr. Must be specified as a list of `collection=field`.",
+ ConfigOption.Type.GLOBAL, String[].class);
+
+ public static final ConfigOption<String> TTL_FIELD = new ConfigOption<String>(SOLR_NS,"ttl_field",
+ "Name of the TTL field for Solr collections.",
+ ConfigOption.Type.GLOBAL_OFFLINE, "ttl");
+
+ /** SolrCloud Configuration */
+
+ public static final ConfigOption<String> ZOOKEEPER_URL = new ConfigOption<String>(SOLR_NS,"zookeeper-url",
+ "URL of the Zookeeper instance coordinating the SolrCloud cluster",
+ ConfigOption.Type.MASKABLE, "localhost:2181");
+
+ public static final ConfigOption<Integer> NUM_SHARDS = new ConfigOption<Integer>(SOLR_NS,"num-shards",
+ "Number of shards for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.",
+ ConfigOption.Type.GLOBAL_OFFLINE, 1);
+
+ public static final ConfigOption<Integer> MAX_SHARDS_PER_NODE = new ConfigOption<Integer>(SOLR_NS,"max-shards-per-node",
+ "Maximum number of shards per node. This applies when creating a new collection which is only supported under the SolrCloud operation mode.",
+ ConfigOption.Type.GLOBAL_OFFLINE, 1);
+
+ public static final ConfigOption<Integer> REPLICATION_FACTOR = new ConfigOption<Integer>(SOLR_NS,"replication-factor",
+ "Replication factor for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.",
+ ConfigOption.Type.GLOBAL_OFFLINE, 1);
+
+
+ /** HTTP Configuration */
+
+ public static final ConfigOption<String[]> HTTP_URLS = new ConfigOption<String[]>(SOLR_NS,"http-urls",
+ "List of URLs to use to connect to Solr Servers (LBHttpSolrClient is used), don't add core or collection name to the URL.",
+ ConfigOption.Type.MASKABLE, new String[] { "http://localhost:8983/solr" });
+
+ public static final ConfigOption<Integer> HTTP_CONNECTION_TIMEOUT = new ConfigOption<Integer>(SOLR_NS,"http-connection-timeout",
+ "Solr HTTP connection timeout.",
+ ConfigOption.Type.MASKABLE, 5000);
+
+ public static final ConfigOption<Boolean> HTTP_ALLOW_COMPRESSION = new ConfigOption<Boolean>(SOLR_NS,"http-compression",
+ "Enable/disable compression on the HTTP connections made to Solr.",
+ ConfigOption.Type.MASKABLE, false);
+
+ public static final ConfigOption<Integer> HTTP_MAX_CONNECTIONS_PER_HOST = new ConfigOption<Integer>(SOLR_NS,"http-max-per-host",
+ "Maximum number of HTTP connections per Solr host.",
+ ConfigOption.Type.MASKABLE, 20);
+
+ public static final ConfigOption<Integer> HTTP_GLOBAL_MAX_CONNECTIONS = new ConfigOption<Integer>(SOLR_NS,"http-max",
+ "Maximum number of HTTP connections in total to all Solr servers.",
+ ConfigOption.Type.MASKABLE, 100);
+
+ public static final ConfigOption<Boolean> WAIT_SEARCHER = new ConfigOption<Boolean>(SOLR_NS, "wait-searcher",
+ "When mutating - wait for the index to reflect new mutations before returning. This can have a negative impact on performance.",
+ ConfigOption.Type.LOCAL, false);
+
+
+
+ private static final IndexFeatures SOLR_FEATURES = new IndexFeatures.Builder().supportsDocumentTTL()
+ .setDefaultStringMapping(Mapping.TEXT).supportedStringMappings(Mapping.TEXT, Mapping.STRING).build();
+
+ private final SolrClient solrClient;
+ private final Configuration configuration;
+ private final Mode mode;
+ private final boolean dynFields;
+ private final Map<String, String> keyFieldIds;
+ private final String ttlField;
+ private final int maxResults;
+ private final boolean waitSearcher;
+
+ public Solr5Index(final Configuration config) throws BackendException {
+ Preconditions.checkArgument(config!=null);
+ configuration = config;
+
+ mode = Mode.parse(config.get(SOLR_MODE));
+ dynFields = config.get(DYNAMIC_FIELDS);
+ keyFieldIds = parseKeyFieldsForCollections(config);
+ maxResults = config.get(INDEX_MAX_RESULT_SET_SIZE);
+ ttlField = config.get(TTL_FIELD);
+ waitSearcher = config.get(WAIT_SEARCHER);
+
+ if (mode==Mode.CLOUD) {
+ String zookeeperUrl = config.get(Solr5Index.ZOOKEEPER_URL);
+ CloudSolrClient cloudServer = new CloudSolrClient(zookeeperUrl, true);
+ cloudServer.connect();
+ solrClient = cloudServer;
+ } else if (mode==Mode.HTTP) {
+ HttpClient clientParams = HttpClientUtil.createClient(new ModifiableSolrParams() {{
+ add(HttpClientUtil.PROP_ALLOW_COMPRESSION, config.get(HTTP_ALLOW_COMPRESSION).toString());
+ add(HttpClientUtil.PROP_CONNECTION_TIMEOUT, config.get(HTTP_CONNECTION_TIMEOUT).toString());
+ add(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, config.get(HTTP_MAX_CONNECTIONS_PER_HOST).toString());
+ add(HttpClientUtil.PROP_MAX_CONNECTIONS, config.get(HTTP_GLOBAL_MAX_CONNECTIONS).toString());
+ }});
+
+ solrClient = new LBHttpSolrClient(clientParams, config.get(HTTP_URLS));
+
+
+ } else {
+ throw new IllegalArgumentException("Unsupported Solr operation mode: " + mode);
+ }
+ }
+
+ private Map<String, String> parseKeyFieldsForCollections(Configuration config) throws BackendException {
+ Map<String, String> keyFieldNames = new HashMap<String, String>();
+ String[] collectionFieldStatements = config.has(KEY_FIELD_NAMES)?config.get(KEY_FIELD_NAMES):new String[0];
+ for (String collectionFieldStatement : collectionFieldStatements) {
+ String[] parts = collectionFieldStatement.trim().split("=");
+ if (parts.length != 2) {
+ throw new PermanentBackendException("Unable to parse the collection name / key field name pair. It should be of the format collection=field");
+ }
+ String collectionName = parts[0];
+ String keyFieldName = parts[1];
+ keyFieldNames.put(collectionName, keyFieldName);
+ }
+ return keyFieldNames;
+ }
+
+ private String getKeyFieldId(String collection) {
+ String field = keyFieldIds.get(collection);
+ if (field==null) field = DEFAULT_ID_FIELD;
+ return field;
+ }
+
+ /**
+ * Unlike the ElasticSearch Index, which is schema free, Solr requires a schema to
+ * support searching. This means that you will need to modify the solr schema with the
+ * appropriate field definitions in order to work properly. If you have a running instance
+ * of Solr and you modify its schema with new fields, don't forget to re-index!
+ * @param store Index store
+ * @param key New key to register
+ * @param information Datatype to register for the key
+ * @param tx enclosing transaction
+ * @throws com.thinkaurelius.titan.diskstorage.BackendException
+ */
+ @Override
+ public void register(String store, String key, KeyInformation information, BaseTransaction tx) throws BackendException {
+ if (mode==Mode.CLOUD) {
+ CloudSolrClient client = (CloudSolrClient) solrClient;
+ try {
+ createCollectionIfNotExists(client, configuration, store);
+ } catch (IOException e) {
+ throw new PermanentBackendException(e);
+ } catch (SolrServerException e) {
+ throw new PermanentBackendException(e);
+ } catch (InterruptedException e) {
+ throw new PermanentBackendException(e);
+ } catch (KeeperException e) {
+ throw new PermanentBackendException(e);
+ }
+ }
+ //Since all data types must be defined in the schema.xml, pre-registering a type does not work
+ }
+
+ @Override
+ public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException {
+ logger.debug("Mutating SOLR");
+ try {
+ for (Map.Entry<String, Map<String, IndexMutation>> stores : mutations.entrySet()) {
+ String collectionName = stores.getKey();
+ String keyIdField = getKeyFieldId(collectionName);
+
+ List<String> deleteIds = new ArrayList<String>();
+ Collection<SolrInputDocument> changes = new ArrayList<SolrInputDocument>();
+
+ for (Map.Entry<String, IndexMutation> entry : stores.getValue().entrySet()) {
+ String docId = entry.getKey();
+ IndexMutation mutation = entry.getValue();
+ Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted()));
+ Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions());
+ Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions());
+
+ //Handle any deletions
+ if (mutation.hasDeletions()) {
+ if (mutation.isDeleted()) {
+ logger.trace("Deleting entire document {}", docId);
+ deleteIds.add(docId);
+ } else {
+ HashSet<IndexEntry> fieldDeletions = Sets.newHashSet(mutation.getDeletions());
+ if (mutation.hasAdditions()) {
+ for (IndexEntry indexEntry : mutation.getAdditions()) {
+ fieldDeletions.remove(indexEntry);
+ }
+ }
+ deleteIndividualFieldsFromIndex(collectionName, keyIdField, docId, fieldDeletions);
+ }
+ }
+
+ if (mutation.hasAdditions()) {
+ int ttl = mutation.determineTTL();
+
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.setField(keyIdField, docId);
+
+ boolean isNewDoc = mutation.isNew();
+
+ if (isNewDoc)
+ logger.trace("Adding new document {}", docId);
+
+ for (IndexEntry e : mutation.getAdditions()) {
+ final Object fieldValue = convertValue(e.value);
+ doc.setField(e.field, isNewDoc
+ ? fieldValue : new HashMap<String, Object>(1) {{ put("set", fieldValue); }});
+ }
+ if (ttl>0) {
+ Preconditions.checkArgument(isNewDoc,"Solr only supports TTL on new documents [%s]",docId);
+ doc.setField(ttlField, String.format("+%dSECONDS", ttl));
+ }
+ changes.add(doc);
+ }
+ }
+
+ commitDeletes(collectionName, deleteIds);
+ commitDocumentChanges(collectionName, changes);
+ }
+ } catch (Exception e) {
+ throw storageException(e);
+ }
+ }
+
+ private Object convertValue(Object value) throws BackendException {
+ if (value instanceof Geoshape)
+ return GeoToWktConverter.convertToWktString((Geoshape) value);
+ // in order to serialize/deserialize properly Solr will have to have an
+ // access to Titan source which has Decimal type, so for now we simply convert to
+ // double and let Solr do the same thing or fail.
+ if (value instanceof AbstractDecimal)
+ return ((AbstractDecimal) value).doubleValue();
+ if (value instanceof UUID)
+ return value.toString();
+ return value;
+ }
+
+ @Override
+ public void restore(Map<String, Map<String, List<IndexEntry>>> documents, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException {
+ try {
+ for (Map.Entry<String, Map<String, List<IndexEntry>>> stores : documents.entrySet()) {
+ final String collectionName = stores.getKey();
+
+ List<String> deleteIds = new ArrayList<String>();
+ List<SolrInputDocument> newDocuments = new ArrayList<SolrInputDocument>();
+
+ for (Map.Entry<String, List<IndexEntry>> entry : stores.getValue().entrySet()) {
+ final String docID = entry.getKey();
+ final List<IndexEntry> content = entry.getValue();
+
+ if (content == null || content.isEmpty()) {
+ if (logger.isTraceEnabled())
+ logger.trace("Deleting document [{}]", docID);
+
+ deleteIds.add(docID);
+ continue;
+ }
+
+ newDocuments.add(new SolrInputDocument() {{
+ setField(getKeyFieldId(collectionName), docID);
+
+ for (IndexEntry addition : content) {
+ Object fieldValue = addition.value;
+ setField(addition.field, convertValue(fieldValue));
+ }
+ }});
+ }
+
+ commitDeletes(collectionName, deleteIds);
+ commitDocumentChanges(collectionName, newDocuments);
+ }
+ } catch (Exception e) {
+ throw new TemporaryBackendException("Could not restore Solr index", e);
+ }
+ }
+
+ private void deleteIndividualFieldsFromIndex(String collectionName, String keyIdField, String docId, HashSet<IndexEntry> fieldDeletions) throws SolrServerException, IOException {
+ if (fieldDeletions.isEmpty()) return;
+
+ Map<String, String> fieldDeletes = new HashMap<String, String>(1) {{ put("set", null); }};
+
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField(keyIdField, docId);
+ StringBuilder sb = new StringBuilder();
+ for (IndexEntry fieldToDelete : fieldDeletions) {
+ doc.addField(fieldToDelete.field, fieldDeletes);
+ sb.append(fieldToDelete).append(",");
+ }
+
+ if (logger.isTraceEnabled())
+ logger.trace("Deleting individual fields [{}] for document {}", sb.toString(), docId);
+
+ UpdateRequest singleDocument = newUpdateRequest();
+ singleDocument.add(doc);
+ solrClient.request(singleDocument, collectionName);
+ }
+
+ private void commitDocumentChanges(String collectionName, Collection<SolrInputDocument> documents) throws SolrServerException, IOException {
+ if (documents.size() == 0) return;
+
+ try {
+ solrClient.request(newUpdateRequest().add(documents), collectionName);
+ } catch (HttpSolrClient.RemoteSolrException rse) {
+ logger.error("Unable to save documents to Solr as one of the shape objects stored were not compatible with Solr.", rse);
+ logger.error("Details in failed document batch: ");
+ for (SolrInputDocument d : documents) {
+ Collection<String> fieldNames = d.getFieldNames();
+ for (String name : fieldNames) {
+ logger.error(name + ":" + d.getFieldValue(name).toString());
+ }
+ }
+
+ throw rse;
+ }
+ }
+
+ private void commitDeletes(String collectionName, List<String> deleteIds) throws SolrServerException, IOException {
+ if (deleteIds.size() == 0) return;
+ solrClient.request(newUpdateRequest().deleteById(deleteIds), collectionName);
+ }
+
+ @Override
+ public List<String> query(IndexQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException {
+ List<String> result;
+ String collection = query.getStore();
+ String keyIdField = getKeyFieldId(collection);
+ SolrQuery solrQuery = new SolrQuery("*:*");
+ String queryFilter = buildQueryFilter(query.getCondition(), informations.get(collection));
+ solrQuery.addFilterQuery(queryFilter);
+ if (!query.getOrder().isEmpty()) {
+ List<IndexQuery.OrderEntry> orders = query.getOrder();
+ for (IndexQuery.OrderEntry order1 : orders) {
+ String item = order1.getKey();
+ SolrQuery.ORDER order = order1.getOrder() == Order.ASC ? SolrQuery.ORDER.asc : SolrQuery.ORDER.desc;
+ solrQuery.addSort(new SolrQuery.SortClause(item, order));
+ }
+ }
+ solrQuery.setStart(0);
+ if (query.hasLimit()) {
+ solrQuery.setRows(query.getLimit());
+ } else {
+ solrQuery.setRows(maxResults);
+ }
+ try {
+ QueryResponse response = solrClient.query(collection, solrQuery);
+
+ if (logger.isDebugEnabled())
+ logger.debug("Executed query [{}] in {} ms", query.getCondition(), response.getElapsedTime());
+
+ int totalHits = response.getResults().size();
+
+ if (!query.hasLimit() && totalHits >= maxResults)
+ logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query);
+
+ result = new ArrayList<String>(totalHits);
+ for (SolrDocument hit : response.getResults()) {
+ result.add(hit.getFieldValue(keyIdField).toString());
+ }
+ } catch (IOException e) {
+ logger.error("Query did not complete : ", e);
+ throw new PermanentBackendException(e);
+ } catch (SolrServerException e) {
+ logger.error("Unable to query Solr index.", e);
+ throw new PermanentBackendException(e);
+ }
+ return result;
+ }
+
+ @Override
+ public Iterable<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException {
+ List<RawQuery.Result<String>> result;
+ String collection = query.getStore();
+ String keyIdField = getKeyFieldId(collection);
+ SolrQuery solrQuery = new SolrQuery(query.getQuery())
+ .addField(keyIdField)
+ .setIncludeScore(true)
+ .setStart(query.getOffset())
+ .setRows(query.hasLimit() ? query.getLimit() : maxResults);
+
+ try {
+ QueryResponse response = solrClient.query(collection, solrQuery);
+ if (logger.isDebugEnabled())
+ logger.debug("Executed query [{}] in {} ms", query.getQuery(), response.getElapsedTime());
+
+ int totalHits = response.getResults().size();
+ if (!query.hasLimit() && totalHits >= maxResults) {
+ logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query);
+ }
+ result = new ArrayList<RawQuery.Result<String>>(totalHits);
+
+ for (SolrDocument hit : response.getResults()) {
+ double score = Double.parseDouble(hit.getFieldValue("score").toString());
+ result.add(new RawQuery.Result<String>(hit.getFieldValue(keyIdField).toString(), score));
+ }
+ } catch (IOException e) {
+ logger.error("Query did not complete : ", e);
+ throw new PermanentBackendException(e);
+ } catch (SolrServerException e) {
+ logger.error("Unable to query Solr index.", e);
+ throw new PermanentBackendException(e);
+ }
+ return result;
+ }
+
+ private static String escapeValue(Object value) {
+ return ClientUtils.escapeQueryChars(value.toString());
+ }
+
+ public String buildQueryFilter(Condition<TitanElement> condition, KeyInformation.StoreRetriever informations) {
+ if (condition instanceof PredicateCondition) {
+ PredicateCondition<String, TitanElement> atom = (PredicateCondition<String, TitanElement>) condition;
+ Object value = atom.getValue();
+ String key = atom.getKey();
+ TitanPredicate titanPredicate = atom.getPredicate();
+
+ if (value instanceof Number) {
+ String queryValue = escapeValue(value);
+ Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on numeric types: " + titanPredicate);
+ Cmp numRel = (Cmp) titanPredicate;
+ switch (numRel) {
+ case EQUAL:
+ return (key + ":" + queryValue);
+ case NOT_EQUAL:
+ return ("-" + key + ":" + queryValue);
+ case LESS_THAN:
+ //use right curly to mean up to but not including value
+ return (key + ":[* TO " + queryValue + "}");
+ case LESS_THAN_EQUAL:
+ return (key + ":[* TO " + queryValue + "]");
+ case GREATER_THAN:
+ //use left curly to mean greater than but not including value
+ return (key + ":{" + queryValue + " TO *]");
+ case GREATER_THAN_EQUAL:
+ return (key + ":[" + queryValue + " TO *]");
+ default: throw new IllegalArgumentException("Unexpected relation: " + numRel);
+ }
+ } else if (value instanceof String) {
+ Mapping map = getStringMapping(informations.get(key));
+ assert map==Mapping.TEXT || map==Mapping.STRING;
+ if (map==Mapping.TEXT && !titanPredicate.toString().startsWith("CONTAINS"))
+ throw new IllegalArgumentException("Text mapped string values only support CONTAINS queries and not: " + titanPredicate);
+ if (map==Mapping.STRING && titanPredicate.toString().startsWith("CONTAINS"))
+ throw new IllegalArgumentException("String mapped string values do not support CONTAINS queries: " + titanPredicate);
+
+ //Special case
+ if (titanPredicate == Text.CONTAINS) {
+ //e.g. - if terms tomorrow and world were supplied, and fq=text:(tomorrow world)
+ //sample data set would return 2 documents: one where text = Tomorrow is the World,
+ //and the second where text = Hello World. Hence, we are decomposing the query string
+ //and building an AND query explicitly because we need AND semantics
+ value = ((String) value).toLowerCase();
+ List<String> terms = Text.tokenize((String) value);
+
+ if (terms.isEmpty()) {
+ return "";
+ } else if (terms.size() == 1) {
+ return (key + ":(" + escapeValue(terms.get(0)) + ")");
+ } else {
+ And<TitanElement> andTerms = new And<TitanElement>();
+ for (String term : terms) {
+ andTerms.add(new PredicateCondition<String, TitanElement>(key, titanPredicate, term));
+ }
+ return buildQueryFilter(andTerms, informations);
+ }
+ }
+ if (titanPredicate == Text.PREFIX || titanPredicate == Text.CONTAINS_PREFIX) {
+ return (key + ":" + escapeValue(value) + "*");
+ } else if (titanPredicate == Text.REGEX || titanPredicate == Text.CONTAINS_REGEX) {
+ return (key + ":/" + value + "/");
+ } else if (titanPredicate == Cmp.EQUAL) {
+ return (key + ":\"" + escapeValue(value) + "\"");
+ } else if (titanPredicate == Cmp.NOT_EQUAL) {
+ return ("-" + key + ":\"" + escapeValue(value) + "\"");
+ } else {
+ throw new IllegalArgumentException("Relation is not supported for string value: " + titanPredicate);
+ }
+ } else if (value instanceof Geoshape) {
+ Geoshape geo = (Geoshape)value;
+ if (geo.getType() == Geoshape.Type.CIRCLE) {
+ Geoshape.Point center = geo.getPoint();
+ return ("{!geofilt sfield=" + key +
+ " pt=" + center.getLatitude() + "," + center.getLongitude() +
+ " d=" + geo.getRadius() + "} distErrPct=0"); //distance in kilometers
+ } else if (geo.getType() == Geoshape.Type.BOX) {
+ Geoshape.Point southwest = geo.getPoint(0);
+ Geoshape.Point northeast = geo.getPoint(1);
+ return (key + ":[" + southwest.getLatitude() + "," + southwest.getLongitude() +
+ " TO " + northeast.getLatitude() + "," + northeast.getLongitude() + "]");
+ } else if (geo.getType() == Geoshape.Type.POLYGON) {
+ List<Geoshape.Point> coordinates = getPolygonPoints(geo);
+ StringBuilder poly = new StringBuilder(key + ":\"IsWithin(POLYGON((");
+ for (Geoshape.Point coordinate : coordinates) {
+ poly.append(coordinate.getLongitude()).append(" ").append(coordinate.getLatitude()).append(", ");
+ }
+ //close the polygon with the first coordinate
+ poly.append(coordinates.get(0).getLongitude()).append(" ").append(coordinates.get(0).getLatitude());
+ poly.append(")))\" distErrPct=0");
+ return (poly.toString());
+ }
+ } else if (value instanceof Date) {
+ String queryValue = escapeValue(toIsoDate((Date)value));
+ Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on date types: " + titanPredicate);
+ Cmp numRel = (Cmp) titanPredicate;
+
+ switch (numRel) {
+ case EQUAL:
+ return (key + ":" + queryValue);
+ case NOT_EQUAL:
+ return ("-" + key + ":" + queryValue);
+ case LESS_THAN:
+ //use right curly to mean up to but not including value
+ return (key + ":[* TO " + queryValue + "}");
+ case LESS_THAN_EQUAL:
+ return (key + ":[* TO " + queryValue + "]");
+ case GREATER_THAN:
+ //use left curly to mean greater than but not including value
+ return (key + ":{" + queryValue + " TO *]");
+ case GREATER_THAN_EQUAL:
+ return (key + ":[" + queryValue + " TO *]");
+ default: throw new IllegalArgumentException("Unexpected relation: " + numRel);
+ }
+ } else if (value instanceof Boolean) {
+ Cmp numRel = (Cmp) titanPredicate;
+ String queryValue = escapeValue(value);
+ switch (numRel) {
+ case EQUAL:
+ return (key + ":" + queryValue);
+ case NOT_EQUAL:
+ return ("-" + key + ":" + queryValue);
+ default:
+ throw new IllegalArgumentException("Boolean types only support EQUAL or NOT_EQUAL");
+ }
+ } else if (value instanceof UUID) {
+ if (titanPredicate == Cmp.EQUAL) {
+ return (key + ":\"" + escapeValue(value) + "\"");
+ } else if (titanPredicate == Cmp.NOT_EQUAL) {
+ return ("-" + key + ":\"" + escapeValue(value) + "\"");
+ } else {
+ throw new IllegalArgumentException("Relation is not supported for uuid value: " + titanPredicate);
+ }
+ } else throw new IllegalArgumentException("Unsupported type: " + value);
+ } else if (condition instanceof Not) {
+ String sub = buildQueryFilter(((Not)condition).getChild(),informations);
+ if (StringUtils.isNotBlank(sub)) return "-("+sub+")";
+ else return "";
+ } else if (condition instanceof And) {
+ int numChildren = ((And) condition).size();
+ StringBuilder sb = new StringBuilder();
+ for (Condition<TitanElement> c : condition.getChildren()) {
+ String sub = buildQueryFilter(c, informations);
+
+ if (StringUtils.isBlank(sub))
+ continue;
+
+ // we don't have to add "+" which means AND iff
+ // a. it's a NOT query,
+ // b. expression is a single statement in the AND.
+ if (!sub.startsWith("-") && numChildren > 1)
+ sb.append("+");
+
+ sb.append(sub).append(" ");
+ }
+ return sb.toString();
+ } else if (condition instanceof Or) {
+ StringBuilder sb = new StringBuilder();
+ int element=0;
+ for (Condition<TitanElement> c : condition.getChildren()) {
+ String sub = buildQueryFilter(c,informations);
+ if (StringUtils.isBlank(sub)) continue;
+ if (element==0) sb.append("(");
+ else sb.append(" OR ");
+ sb.append(sub);
+ element++;
+ }
+ if (element>0) sb.append(")");
+ return sb.toString();
+ } else {
+ throw new IllegalArgumentException("Invalid condition: " + condition);
+ }
+ return null;
+ }
+
+ private String toIsoDate(Date value) {
+ TimeZone tz = TimeZone.getTimeZone("UTC");
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+ df.setTimeZone(tz);
+ return df.format(value);
+ }
+
+ private List<Geoshape.Point> getPolygonPoints(Geoshape polygon) {
+ List<Geoshape.Point> locations = new ArrayList<Geoshape.Point>();
+
+ int index = 0;
+ boolean hasCoordinates = true;
+ while (hasCoordinates) {
+ try {
+ locations.add(polygon.getPoint(index));
+ } catch (ArrayIndexOutOfBoundsException ignore) {
+ //just means we asked for a point past the size of the list
+ //of known coordinates
+ hasCoordinates = false;
+ }
+ }
+
+ return locations;
+ }
+
+ /**
+ * Solr handles all transactions on the server-side. That means all
+ * commit, optimize, or rollback applies since the last commit/optimize/rollback.
+ * Solr documentation recommends best way to update Solr is in one process to avoid
+ * race conditions.
+ *
+ * @return New Transaction Handle
+ * @throws com.thinkaurelius.titan.diskstorage.BackendException
+ */
+ @Override
+ public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig config) throws BackendException {
+ return new DefaultTransaction(config);
+ }
+
+ @Override
+ public void close() throws BackendException {
+ logger.trace("Shutting down connection to Solr", solrClient);
+ try {
+ solrClient.close();
+ } catch (IOException e) {
+ throw new TemporaryBackendException(e);
+ }
+ }
+
+ @Override
+ public void clearStorage() throws BackendException {
+ try {
+ if (mode!=Mode.CLOUD) throw new UnsupportedOperationException("Operation only supported for SolrCloud");
+ logger.debug("Clearing storage from Solr: {}", solrClient);
+ ZkStateReader zkStateReader = ((CloudSolrClient) solrClient).getZkStateReader();
+ zkStateReader.updateClusterState(true);
+ ClusterState clusterState = zkStateReader.getClusterState();
+ for (String collection : clusterState.getCollections()) {
+ logger.debug("Clearing collection [{}] in Solr",collection);
+ UpdateRequest deleteAll = newUpdateRequest();
+ deleteAll.deleteByQuery("*:*");
+ solrClient.request(deleteAll, collection);
+ }
+
+ } catch (SolrServerException e) {
+ logger.error("Unable to clear storage from index due to server error on Solr.", e);
+ throw new PermanentBackendException(e);
+ } catch (IOException e) {
+ logger.error("Unable to clear storage from index due to low-level I/O error.", e);
+ throw new PermanentBackendException(e);
+ } catch (Exception e) {
+ logger.error("Unable to clear storage from index due to general error.", e);
+ throw new PermanentBackendException(e);
+ }
+ }
+
+ @Override
+ public boolean supports(KeyInformation information, TitanPredicate titanPredicate) {
+ Class<?> dataType = information.getDataType();
+ Mapping mapping = Mapping.getMapping(information);
+ if (mapping!=Mapping.DEFAULT && !AttributeUtil.isString(dataType)) return false;
+
+ if (Number.class.isAssignableFrom(dataType)) {
+ return titanPredicate instanceof Cmp;
+ } else if (dataType == Geoshape.class) {
+ return titanPredicate == Geo.WITHIN;
+ } else if (AttributeUtil.isString(dataType)) {
+ switch(mapping) {
+ case DEFAULT:
+ case TEXT:
+ return titanPredicate == Text.CONTAINS || titanPredicate == Text.CONTAINS_PREFIX || titanPredicate == Text.CONTAINS_REGEX;
+ case STRING:
+ return titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL || titanPredicate==Text.REGEX || titanPredicate==Text.PREFIX;
+ // case TEXTSTRING:
+ // return (titanPredicate instanceof Text) || titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL;
+ }
+ } else if (dataType == Date.class) {
+ if (titanPredicate instanceof Cmp) return true;
+ } else if (dataType == Boolean.class) {
+ return titanPredicate == Cmp.EQUAL || titanPredicate == Cmp.NOT_EQUAL;
+ } else if (dataType == UUID.class) {
+ return titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean supports(KeyInformation information) {
+ Class<?> dataType = information.getDataType();
+ Mapping mapping = Mapping.getMapping(information);
+ if (Number.class.isAssignableFrom(dataType) || dataType == Geoshape.class || dataType == Date.class || dataType == Boolean.class || dataType == UUID.class) {
+ if (mapping==Mapping.DEFAULT) return true;
+ } else if (AttributeUtil.isString(dataType)) {
+ if (mapping==Mapping.DEFAULT || mapping==Mapping.TEXT || mapping==Mapping.STRING) return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String mapKey2Field(String key, KeyInformation keyInfo) {
+ Preconditions.checkArgument(!StringUtils.containsAny(key, new char[]{' '}),"Invalid key name provided: %s",key);
+ if (!dynFields) return key;
+ if (ParameterType.MAPPED_NAME.hasParameter(keyInfo.getParameters())) return key;
+ String postfix;
+ Class datatype = keyInfo.getDataType();
+ if (AttributeUtil.isString(datatype)) {
+ Mapping map = getStringMapping(keyInfo);
+ switch (map) {
+ case TEXT: postfix = "_t"; break;
+ case STRING: postfix = "_s"; break;
+ default: throw new IllegalArgumentException("Unsupported string mapping: " + map);
+ }
+ } else if (AttributeUtil.isWholeNumber(datatype)) {
+ if (datatype.equals(Long.class)) postfix = "_l";
+ else postfix = "_i";
+ } else if (AttributeUtil.isDecimal(datatype)) {
+ if (datatype.equals(Float.class)) postfix = "_f";
+ else postfix = "_d";
+ } else if (datatype.equals(Geoshape.class)) {
+ postfix = "_g";
+ } else if (datatype.equals(Date.class)) {
+ postfix = "_dt";
+ } else if (datatype.equals(Boolean.class)) {
+ postfix = "_b";
+ } else if (datatype.equals(UUID.class)) {
+ postfix = "_uuid";
+ } else throw new IllegalArgumentException("Unsupported data type ["+datatype+"] for field: " + key);
+ return key+postfix;
+ }
+
+ @Override
+ public IndexFeatures getFeatures() {
+ return SOLR_FEATURES;
+ }
+
+ /*
+ ################# UTILITY METHODS #######################
+ */
+
+ private static Mapping getStringMapping(KeyInformation information) {
+ assert AttributeUtil.isString(information.getDataType());
+ Mapping map = Mapping.getMapping(information);
+ if (map==Mapping.DEFAULT) map = Mapping.TEXT;
+ return map;
+ }
+
+ private UpdateRequest newUpdateRequest() {
+ UpdateRequest req = new UpdateRequest();
+ req.setAction(UpdateRequest.ACTION.COMMIT, true, true);
+ if (waitSearcher) {
+ req.setAction(UpdateRequest.ACTION.COMMIT, true, true);
+ }
+ return req;
+ }
+
+ private BackendException storageException(Exception solrException) {
+ return new TemporaryBackendException("Unable to complete query on Solr.", solrException);
+ }
+
+ private static void createCollectionIfNotExists(CloudSolrClient client, Configuration config, String collection)
+ throws IOException, SolrServerException, KeeperException, InterruptedException {
+ if (!checkIfCollectionExists(client, collection)) {
+ Integer numShards = config.get(NUM_SHARDS);
+ Integer maxShardsPerNode = config.get(MAX_SHARDS_PER_NODE);
+ Integer replicationFactor = config.get(REPLICATION_FACTOR);
+
+ CollectionAdminRequest.Create createRequest = new CollectionAdminRequest.Create();
+
+ createRequest.setConfigName(collection);
+ createRequest.setCollectionName(collection);
+ createRequest.setNumShards(numShards);
+ createRequest.setMaxShardsPerNode(maxShardsPerNode);
+ createRequest.setReplicationFactor(replicationFactor);
+
+ CollectionAdminResponse createResponse = createRequest.process(client);
+ if (createResponse.isSuccess()) {
+ logger.trace("Collection {} successfully created.", collection);
+ } else {
+ throw new SolrServerException(Joiner.on("\n").join(createResponse.getErrorMessages()));
+ }
+ }
+
+ waitForRecoveriesToFinish(client, collection);
+ }
+
+ /**
+ * Checks if the collection has already been created in Solr.
+ */
+ private static boolean checkIfCollectionExists(CloudSolrClient server, String collection) throws KeeperException, InterruptedException {
+ ZkStateReader zkStateReader = server.getZkStateReader();
+ zkStateReader.updateClusterState(true);
+ ClusterState clusterState = zkStateReader.getClusterState();
+ return clusterState.getCollectionOrNull(collection) != null;
+ }
+
+ /**
+ * Wait for all the collection shards to be ready.
+ */
+ private static void waitForRecoveriesToFinish(CloudSolrClient server, String collection) throws KeeperException, InterruptedException {
+ ZkStateReader zkStateReader = server.getZkStateReader();
+ try {
+ boolean cont = true;
+
+ while (cont) {
+ boolean sawLiveRecovering = false;
+ zkStateReader.updateClusterState(true);
+ ClusterState clusterState = zkStateReader.getClusterState();
+ Map<String, Slice> slices = clusterState.getSlicesMap(collection);
+ Preconditions.checkNotNull("Could not find collection:" + collection, slices);
+
+ for (Map.Entry<String, Slice> entry : slices.entrySet()) {
+ Map<String, Replica> shards = entry.getValue().getReplicasMap();
+ for (Map.Entry<String, Replica> shard : shards.entrySet()) {
+ String state = shard.getValue().getStr(ZkStateReader.STATE_PROP);
+ if ((state.equals(ZkStateReader.RECOVERING)
+ || state.equals(ZkStateReader.SYNC) || state
+ .equals(ZkStateReader.DOWN))
+ && clusterState.liveNodesContain(shard.getValue().getStr(
+ ZkStateReader.NODE_NAME_PROP))) {
+ sawLiveRecovering = true;
+ }
+ }
+ }
+ if (!sawLiveRecovering) {
+ cont = false;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+ } finally {
+ logger.info("Exiting solr wait");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java b/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
index 5e61b9a..6605ae7 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java
@@ -18,9 +18,12 @@
package org.apache.atlas.repository.graph;
+import com.google.common.collect.ImmutableMap;
import com.google.inject.Provides;
import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph;
+import com.thinkaurelius.titan.diskstorage.StandardIndexProvider;
+import com.thinkaurelius.titan.diskstorage.solr.Solr5Index;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration;
@@ -28,6 +31,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Singleton;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
/**
* Default implementation for Graph Provider that doles out Titan Graph.
@@ -48,6 +55,35 @@ public class TitanGraphProvider implements GraphProvider<TitanGraph> {
return ApplicationProperties.getSubsetConfiguration(configProperties, GRAPH_PREFIX);
}
+ static {
+ addSolr5Index();
+ }
+
+ /**
+ * Titan loads index backend name to implementation using StandardIndexProvider.ALL_MANAGER_CLASSES
+ * But StandardIndexProvider.ALL_MANAGER_CLASSES is a private static final ImmutableMap
+ * Only way to inject Solr5Index is to modify this field. So, using hacky reflection to add Sol5Index
+ */
+ private static void addSolr5Index() {
+ try {
+ Field field = StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES");
+ field.setAccessible(true);
+
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+ Map<String, String> customMap = new HashMap(StandardIndexProvider.getAllProviderClasses());
+ customMap.put("solr5", Solr5Index.class.getName());
+ ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap);
+ field.set(null, immap);
+
+ LOG.debug("Injected solr5 index - {}", Solr5Index.class.getName());
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
@Singleton
@Provides
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/src/conf/solr/currency.xml
----------------------------------------------------------------------
diff --git a/src/conf/solr/currency.xml b/src/conf/solr/currency.xml
new file mode 100644
index 0000000..3a9c58a
--- /dev/null
+++ b/src/conf/solr/currency.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Example exchange rates file for CurrencyField type named "currency" in example schema -->
+
+<currencyConfig version="1.0">
+ <rates>
+ <!-- Updated from http://www.exchangerate.com/ at 2011-09-27 -->
+ <rate from="USD" to="ARS" rate="4.333871" comment="ARGENTINA Peso" />
+ <rate from="USD" to="AUD" rate="1.025768" comment="AUSTRALIA Dollar" />
+ <rate from="USD" to="EUR" rate="0.743676" comment="European Euro" />
+ <rate from="USD" to="BRL" rate="1.881093" comment="BRAZIL Real" />
+ <rate from="USD" to="CAD" rate="1.030815" comment="CANADA Dollar" />
+ <rate from="USD" to="CLP" rate="519.0996" comment="CHILE Peso" />
+ <rate from="USD" to="CNY" rate="6.387310" comment="CHINA Yuan" />
+ <rate from="USD" to="CZK" rate="18.47134" comment="CZECH REP. Koruna" />
+ <rate from="USD" to="DKK" rate="5.515436" comment="DENMARK Krone" />
+ <rate from="USD" to="HKD" rate="7.801922" comment="HONG KONG Dollar" />
+ <rate from="USD" to="HUF" rate="215.6169" comment="HUNGARY Forint" />
+ <rate from="USD" to="ISK" rate="118.1280" comment="ICELAND Krona" />
+ <rate from="USD" to="INR" rate="49.49088" comment="INDIA Rupee" />
+ <rate from="USD" to="XDR" rate="0.641358" comment="INTNL MON. FUND SDR" />
+ <rate from="USD" to="ILS" rate="3.709739" comment="ISRAEL Sheqel" />
+ <rate from="USD" to="JPY" rate="76.32419" comment="JAPAN Yen" />
+ <rate from="USD" to="KRW" rate="1169.173" comment="KOREA (SOUTH) Won" />
+ <rate from="USD" to="KWD" rate="0.275142" comment="KUWAIT Dinar" />
+ <rate from="USD" to="MXN" rate="13.85895" comment="MEXICO Peso" />
+ <rate from="USD" to="NZD" rate="1.285159" comment="NEW ZEALAND Dollar" />
+ <rate from="USD" to="NOK" rate="5.859035" comment="NORWAY Krone" />
+ <rate from="USD" to="PKR" rate="87.57007" comment="PAKISTAN Rupee" />
+ <rate from="USD" to="PEN" rate="2.730683" comment="PERU Sol" />
+ <rate from="USD" to="PHP" rate="43.62039" comment="PHILIPPINES Peso" />
+ <rate from="USD" to="PLN" rate="3.310139" comment="POLAND Zloty" />
+ <rate from="USD" to="RON" rate="3.100932" comment="ROMANIA Leu" />
+ <rate from="USD" to="RUB" rate="32.14663" comment="RUSSIA Ruble" />
+ <rate from="USD" to="SAR" rate="3.750465" comment="SAUDI ARABIA Riyal" />
+ <rate from="USD" to="SGD" rate="1.299352" comment="SINGAPORE Dollar" />
+ <rate from="USD" to="ZAR" rate="8.329761" comment="SOUTH AFRICA Rand" />
+ <rate from="USD" to="SEK" rate="6.883442" comment="SWEDEN Krona" />
+ <rate from="USD" to="CHF" rate="0.906035" comment="SWITZERLAND Franc" />
+ <rate from="USD" to="TWD" rate="30.40283" comment="TAIWAN Dollar" />
+ <rate from="USD" to="THB" rate="30.89487" comment="THAILAND Baht" />
+ <rate from="USD" to="AED" rate="3.672955" comment="U.A.E. Dirham" />
+ <rate from="USD" to="UAH" rate="7.988582" comment="UKRAINE Hryvnia" />
+ <rate from="USD" to="GBP" rate="0.647910" comment="UNITED KINGDOM Pound" />
+
+ <!-- Cross-rates for some common currencies -->
+ <rate from="EUR" to="GBP" rate="0.869914" />
+ <rate from="EUR" to="NOK" rate="7.800095" />
+ <rate from="GBP" to="NOK" rate="8.966508" />
+ </rates>
+</currencyConfig>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/src/conf/solr/lang/stopwords_en.txt
----------------------------------------------------------------------
diff --git a/src/conf/solr/lang/stopwords_en.txt b/src/conf/solr/lang/stopwords_en.txt
new file mode 100644
index 0000000..2c164c0
--- /dev/null
+++ b/src/conf/solr/lang/stopwords_en.txt
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# a couple of test stopwords to test that the words are really being
+# configured from this file:
+stopworda
+stopwordb
+
+# Standard english stop words taken from Lucene's StopAnalyzer
+a
+an
+and
+are
+as
+at
+be
+but
+by
+for
+if
+in
+into
+is
+it
+no
+not
+of
+on
+or
+such
+that
+the
+their
+then
+there
+these
+they
+this
+to
+was
+will
+with
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/src/conf/solr/protwords.txt
----------------------------------------------------------------------
diff --git a/src/conf/solr/protwords.txt b/src/conf/solr/protwords.txt
new file mode 100644
index 0000000..1dfc0ab
--- /dev/null
+++ b/src/conf/solr/protwords.txt
@@ -0,0 +1,21 @@
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#-----------------------------------------------------------------------
+# Use a protected word file to protect against the stemmer reducing two
+# unrelated words to the same base word.
+
+# Some non-words that normally won't be encountered,
+# just to test that they won't be stemmed.
+dontstems
+zwhacky
+