You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by kr...@apache.org on 2016/11/16 02:47:18 UTC
[3/3] lucene-solr:branch_6x: SOLR-9077: Streaming expressions should
support collection alias
SOLR-9077: Streaming expressions should support collection alias
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e3db9f3b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e3db9f3b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e3db9f3b
Branch: refs/heads/branch_6x
Commit: e3db9f3b8a28e1de0b6fcd5cb358a948f7a23423
Parents: ead40a9
Author: Kevin Risden <kr...@apache.org>
Authored: Sun Oct 16 13:12:00 2016 -0400
Committer: Kevin Risden <kr...@apache.org>
Committed: Tue Nov 15 20:45:19 2016 -0600
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../solr/client/solrj/io/sql/StatementImpl.java | 9 +-
.../client/solrj/io/stream/CloudSolrStream.java | 54 +-
.../io/stream/FeaturesSelectionStream.java | 8 +-
.../client/solrj/io/stream/ParallelStream.java | 8 +-
.../solr/client/solrj/io/stream/SolrStream.java | 2 -
.../client/solrj/io/stream/TextLogitStream.java | 12 +-
.../client/solrj/io/stream/TopicStream.java | 41 +-
.../solr/client/solrj/io/sql/JdbcTest.java | 61 +-
.../client/solrj/io/stream/JDBCStreamTest.java | 43 +-
.../solrj/io/stream/StreamExpressionTest.java | 324 +++++-----
.../client/solrj/io/stream/StreamingTest.java | 588 ++++++++++---------
12 files changed, 601 insertions(+), 551 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3db9f3b/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 81f9016..99bb0a9 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -65,6 +65,8 @@ New Features
* SOLR-9666: SolrJ LukeResponse support dynamic fields (Fengtan via Kevin Risden)
+* SOLR-9077: Streaming expressions should support collection alias (Kevin Risden)
+
Optimizations
----------------------
* SOLR-9704: Facet Module / JSON Facet API: Optimize blockChildren facets that have
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3db9f3b/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/StatementImpl.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/StatementImpl.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/StatementImpl.java
index c05028d..a2c06d4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/StatementImpl.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/StatementImpl.java
@@ -28,8 +28,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.SolrStream;
-import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -78,12 +78,7 @@ class StatementImpl implements Statement {
protected SolrStream constructStream(String sql) throws IOException {
try {
ZkStateReader zkStateReader = this.connection.getClient().getZkStateReader();
- ClusterState clusterState = zkStateReader.getClusterState();
- Collection<Slice> slices = clusterState.getActiveSlices(this.connection.getCollection());
-
- if(slices == null) {
- throw new Exception("Collection not found:"+this.connection.getCollection());
- }
+ Collection<Slice> slices = CloudSolrStream.getSlices(this.connection.getCollection(), zkStateReader, true);
List<Replica> shuffler = new ArrayList<>();
for(Slice slice : slices) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3db9f3b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 2fb56ee..0580122 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -49,6 +49,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
@@ -60,6 +61,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.common.util.StrUtils;
/**
* Connects to Zookeeper to pick replicas from a specific collection to send the query to.
@@ -352,37 +354,57 @@ public class CloudSolrStream extends TupleStream implements Expressible {
}
}
- public static Collection<Slice> getSlicesIgnoreCase(String name, ClusterState clusterState) {
- for (String coll : clusterState.getCollectionStates().keySet()) {
- if (coll.equalsIgnoreCase(name)) {
- DocCollection collection = clusterState.getCollectionOrNull(coll);
- if (collection != null) return collection.getActiveSlices();
+ public static Collection<Slice> getSlices(String collectionName, ZkStateReader zkStateReader, boolean checkAlias) throws IOException {
+ ClusterState clusterState = zkStateReader.getClusterState();
+
+ Map<String, DocCollection> collectionsMap = clusterState.getCollectionsMap();
+
+ // Check collection case sensitive
+ if(collectionsMap.containsKey(collectionName)) {
+ return collectionsMap.get(collectionName).getActiveSlices();
+ }
+
+ // Check collection case insensitive
+ for(String collectionMapKey : collectionsMap.keySet()) {
+ if(collectionMapKey.equalsIgnoreCase(collectionName)) {
+ return collectionsMap.get(collectionMapKey).getActiveSlices();
}
}
- return null;
+
+ if(checkAlias) {
+ // check for collection alias
+ Aliases aliases = zkStateReader.getAliases();
+ String alias = aliases.getCollectionAlias(collectionName);
+ if (alias != null) {
+ Collection<Slice> slices = new ArrayList<>();
+
+ List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
+ for (String aliasCollectionName : aliasList) {
+ // Add all active slices for this alias collection
+ slices.addAll(collectionsMap.get(aliasCollectionName).getActiveSlices());
+ }
+
+ return slices;
+ }
+ }
+
+ throw new IOException("Slices not found for " + collectionName);
}
protected void constructStreams() throws IOException {
-
try {
-
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
- Set<String> liveNodes = clusterState.getLiveNodes();
- //System.out.println("Connected to zk an got cluster state.");
- Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
- if (slices == null) slices = getSlicesIgnoreCase(this.collection, clusterState);
- if (slices == null) {
- throw new Exception("Collection not found:" + this.collection);
- }
+ Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams.set("distrib", "false"); // We are the aggregator.
+ Set<String> liveNodes = clusterState.getLiveNodes();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
- List<Replica> shuffler = new ArrayList();
+ List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
shuffler.add(replica);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3db9f3b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
index e9949da..cfb3941 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
@@ -250,17 +250,15 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible{
}
private List<String> getShardUrls() throws IOException {
-
try {
-
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- ClusterState clusterState = zkStateReader.getClusterState();
- Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
+ Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
+
+ ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
List<String> baseUrls = new ArrayList<>();
-
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3db9f3b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
index 3125ff0..10e80ad 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java
@@ -257,15 +257,17 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
}
protected void constructStreams() throws IOException {
-
try {
Object pushStream = ((Expressible) tupleStream).toExpression(streamFactory);
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+
+ Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, true);
+
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
- Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
- List<Replica> shuffler = new ArrayList();
+
+ List<Replica> shuffler = new ArrayList<>();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3db9f3b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index 4ce1051..6a21703 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -115,8 +115,6 @@ public class SolrStream extends TupleStream {
**/
public void open() throws IOException {
-
-
if(cache == null) {
client = new HttpSolrClient.Builder(baseUrl).build();
} else {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3db9f3b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
index ac4550b..c40f785 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
@@ -332,19 +332,18 @@ public class TextLogitStream extends TupleStream implements Expressible {
}
protected List<String> getShardUrls() throws IOException {
-
try {
-
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+
+ Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
+
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
- Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
- List baseUrls = new ArrayList();
-
+ List<String> baseUrls = new ArrayList<>();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
- List<Replica> shuffler = new ArrayList();
+ List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
shuffler.add(replica);
@@ -359,7 +358,6 @@ public class TextLogitStream extends TupleStream implements Expressible {
}
return baseUrls;
-
} catch (Exception e) {
throw new IOException(e);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3db9f3b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index d81391d..5ecee65 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -407,18 +406,21 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
private void getCheckpoints() throws IOException {
- this.checkpoints = new HashMap();
+ this.checkpoints = new HashMap<>();
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+
+ Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
+
ClusterState clusterState = zkStateReader.getClusterState();
- Collection<Slice> slices = clusterState.getActiveSlices(collection);
+ Set<String> liveNodes = clusterState.getLiveNodes();
for(Slice slice : slices) {
String sliceName = slice.getName();
- long checkpoint = 0;
+ long checkpoint;
if(initialCheckpoint > -1) {
checkpoint = initialCheckpoint;
} else {
- checkpoint = getCheckpoint(slice, clusterState.getLiveNodes());
+ checkpoint = getCheckpoint(slice, liveNodes);
}
this.checkpoints.put(sliceName, checkpoint);
@@ -482,21 +484,19 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
private void getPersistedCheckpoints() throws IOException {
-
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+ Collection<Slice> slices = CloudSolrStream.getSlices(checkpointCollection, zkStateReader, false);
+
ClusterState clusterState = zkStateReader.getClusterState();
- Collection<Slice> slices = clusterState.getActiveSlices(checkpointCollection);
Set<String> liveNodes = clusterState.getLiveNodes();
+
OUTER:
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())){
-
-
HttpSolrClient httpClient = streamContext.getSolrClientCache().getHttpSolrClient(replica.getCoreUrl());
try {
-
SolrDocument doc = httpClient.getById(id);
if(doc != null) {
List<String> checkpoints = (List<String>)doc.getFieldValue("checkpoint_ss");
@@ -505,7 +505,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
this.checkpoints.put(pair[0], Long.parseLong(pair[1]));
}
}
- }catch (Exception e) {
+ } catch (Exception e) {
throw new IOException(e);
}
break OUTER;
@@ -515,22 +515,10 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
protected void constructStreams() throws IOException {
-
try {
-
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- ClusterState clusterState = zkStateReader.getClusterState();
- Set<String> liveNodes = clusterState.getLiveNodes();
- //System.out.println("Connected to zk an got cluster state.");
-
- Collection<Slice> slices = clusterState.getActiveSlices(this.collection);
- if (slices == null) slices = getSlicesIgnoreCase(this.collection, clusterState);
- if (slices == null) {
- throw new Exception("Collection not found:" + this.collection);
- }
+ Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
-
- Iterator<String> iterator = params.getParameterNamesIterator();
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams.set("distrib", "false"); // We are the aggregator.
String fl = mParams.get("fl");
@@ -542,12 +530,15 @@ public class TopicStream extends CloudSolrStream implements Expressible {
Random random = new Random();
+ ClusterState clusterState = zkStateReader.getClusterState();
+ Set<String> liveNodes = clusterState.getLiveNodes();
+
for(Slice slice : slices) {
ModifiableSolrParams localParams = new ModifiableSolrParams(mParams);
long checkpoint = checkpoints.get(slice.getName());
Collection<Replica> replicas = slice.getReplicas();
- List<Replica> shuffler = new ArrayList();
+ List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
shuffler.add(replica);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3db9f3b/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
index 41f3309..d0a600d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
@@ -48,12 +48,10 @@ import org.junit.Test;
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"})
public class JdbcTest extends SolrCloudTestCase {
- private static final String COLLECTION = "collection1";
+ private static final String COLLECTIONORALIAS = "collection1";
private static final String id = "id";
- private static final int TIMEOUT = 30;
-
private static String zkHost;
@BeforeClass
@@ -62,9 +60,18 @@ public class JdbcTest extends SolrCloudTestCase {
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
- AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
- false, true, TIMEOUT);
+ String collection;
+ boolean useAlias = random().nextBoolean();
+ if(useAlias) {
+ collection = COLLECTIONORALIAS + "_collection";
+ CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
+ } else {
+ collection = COLLECTIONORALIAS;
+ }
+
+ CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
+ false, true, DEFAULT_TIMEOUT);
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "testnull_i", null)
@@ -77,7 +84,7 @@ public class JdbcTest extends SolrCloudTestCase {
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8", "testnull_i", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9", "testnull_i", null)
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10", "testnull_i", "10")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), collection);
zkHost = cluster.getZkServer().getZkAddress();
}
@@ -87,9 +94,9 @@ public class JdbcTest extends SolrCloudTestCase {
Properties props = new Properties();
- try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
+ try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=" + COLLECTIONORALIAS, props)) {
try (Statement stmt = con.createStatement()) {
- try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i desc limit 2")) {
+ try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from " + COLLECTIONORALIAS + " order by a_i desc limit 2")) {
assertTrue(rs.next());
assertEquals(14, rs.getLong("a_i"));
@@ -112,7 +119,7 @@ public class JdbcTest extends SolrCloudTestCase {
}
//Test statement reuse
- try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i asc limit 2")) {
+ try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from " + COLLECTIONORALIAS + " order by a_i asc limit 2")) {
assertTrue(rs.next());
assertEquals(0, rs.getLong("a_i"));
@@ -137,7 +144,7 @@ public class JdbcTest extends SolrCloudTestCase {
//Test connection reuse
try (Statement stmt = con.createStatement()) {
- try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i desc limit 2")) {
+ try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from " + COLLECTIONORALIAS + " order by a_i desc limit 2")) {
assertTrue(rs.next());
assertEquals(14, rs.getLong("a_i"));
@@ -153,7 +160,7 @@ public class JdbcTest extends SolrCloudTestCase {
//Test statement reuse
stmt.setMaxRows(2);
- try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i asc")) {
+ try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from " + COLLECTIONORALIAS + " order by a_i asc")) {
assertTrue(rs.next());
assertEquals(0, rs.getLong("a_i"));
@@ -168,7 +175,7 @@ public class JdbcTest extends SolrCloudTestCase {
}
//Test simple loop. Since limit is set it will override the statement maxRows.
- try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from collection1 order by a_i asc LIMIT 100")) {
+ try (ResultSet rs = stmt.executeQuery("select id, a_i, a_s, a_f from " + COLLECTIONORALIAS + " order by a_i asc LIMIT 100")) {
int count = 0;
while (rs.next()) {
++count;
@@ -186,9 +193,9 @@ public class JdbcTest extends SolrCloudTestCase {
//Test facet aggregation
Properties props = new Properties();
props.put("aggregationMode", "facet");
- try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
+ try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=" + COLLECTIONORALIAS, props)) {
try (Statement stmt = con.createStatement()) {
- try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
+ try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
assertTrue(rs.next());
@@ -226,9 +233,9 @@ public class JdbcTest extends SolrCloudTestCase {
Properties props = new Properties();
props.put("aggregationMode", "map_reduce");
props.put("numWorkers", "2");
- try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
+ try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=" + COLLECTIONORALIAS, props)) {
try (Statement stmt = con.createStatement()) {
- try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
+ try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
assertTrue(rs.next());
@@ -264,7 +271,7 @@ public class JdbcTest extends SolrCloudTestCase {
//Test params on the url
try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost +
- "?collection=collection1&aggregationMode=map_reduce&numWorkers=2")) {
+ "?collection=" + COLLECTIONORALIAS + "&aggregationMode=map_reduce&numWorkers=2")) {
Properties p = ((ConnectionImpl) con).getProperties();
@@ -272,7 +279,7 @@ public class JdbcTest extends SolrCloudTestCase {
assert (p.getProperty("numWorkers").equals("2"));
try (Statement stmt = con.createStatement()) {
- try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
+ try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
assertTrue(rs.next());
@@ -308,7 +315,7 @@ public class JdbcTest extends SolrCloudTestCase {
// Test JDBC paramters in URL
try (Connection con = DriverManager.getConnection(
- "jdbc:solr://" + zkHost + "?collection=collection1&username=&password=&testKey1=testValue&testKey2")) {
+ "jdbc:solr://" + zkHost + "?collection=" + COLLECTIONORALIAS + "&username=&password=&testKey1=testValue&testKey2")) {
Properties p = ((ConnectionImpl) con).getProperties();
assertEquals("", p.getProperty("username"));
@@ -317,7 +324,7 @@ public class JdbcTest extends SolrCloudTestCase {
assertEquals("", p.getProperty("testKey2"));
try (Statement stmt = con.createStatement()) {
- try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
+ try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
assertTrue(rs.next());
@@ -353,7 +360,7 @@ public class JdbcTest extends SolrCloudTestCase {
// Test JDBC paramters in properties
Properties providedProperties = new Properties();
- providedProperties.put("collection", "collection1");
+ providedProperties.put("collection", COLLECTIONORALIAS);
providedProperties.put("username", "");
providedProperties.put("password", "");
providedProperties.put("testKey1", "testValue");
@@ -367,7 +374,7 @@ public class JdbcTest extends SolrCloudTestCase {
assert (p.getProperty("testKey2").equals(""));
try (Statement stmt = con.createStatement()) {
- try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from collection1 group by a_s " +
+ try (ResultSet rs = stmt.executeQuery("select a_s, sum(a_f) from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
assertTrue(rs.next());
@@ -402,9 +409,9 @@ public class JdbcTest extends SolrCloudTestCase {
//Test error propagation
Properties props = new Properties();
props.put("aggregationMode", "facet");
- try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=collection1", props)) {
+ try (Connection con = DriverManager.getConnection("jdbc:solr://" + zkHost + "?collection=" + COLLECTIONORALIAS, props)) {
try (Statement stmt = con.createStatement()) {
- try (ResultSet rs = stmt.executeQuery("select crap from collection1 group by a_s " +
+ try (ResultSet rs = stmt.executeQuery("select crap from " + COLLECTIONORALIAS + " group by a_s " +
"order by sum(a_f) desc")) {
} catch (Exception e) {
String errorMessage = e.getMessage();
@@ -416,7 +423,7 @@ public class JdbcTest extends SolrCloudTestCase {
@Test
public void testSQLExceptionThrownWhenQueryAndConnUseDiffCollections() throws Exception {
- String badCollection = COLLECTION + "bad";
+ String badCollection = COLLECTIONORALIAS + "bad";
String connectionString = "jdbc:solr://" + zkHost + "?collection=" + badCollection;
String sql = "select id, a_i, a_s, a_f from " + badCollection + " order by a_i desc limit 2";
@@ -434,7 +441,7 @@ public class JdbcTest extends SolrCloudTestCase {
@Test
public void testDriverMetadata() throws Exception {
- String collection = COLLECTION;
+ String collection = COLLECTIONORALIAS;
String connectionString1 = "jdbc:solr://" + zkHost + "?collection=" + collection +
"&username=&password=&testKey1=testValue&testKey2";
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3db9f3b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
index 980d0e7..924d53a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java
@@ -50,7 +50,7 @@ import org.junit.Test;
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40","Lucene41","Lucene42","Lucene45"})
public class JDBCStreamTest extends SolrCloudTestCase {
- private static final String COLLECTION = "jdbc";
+ private static final String COLLECTIONORALIAS = "jdbc";
private static final int TIMEOUT = 30;
@@ -62,8 +62,17 @@ public class JDBCStreamTest extends SolrCloudTestCase {
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.configure();
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
- AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
+ String collection;
+ boolean useAlias = random().nextBoolean();
+ if(useAlias) {
+ collection = COLLECTIONORALIAS + "_collection";
+ CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
+ } else {
+ collection = COLLECTIONORALIAS;
+ }
+
+ CollectionAdminRequest.createCollection(collection, "conf", 2, 1).process(cluster.getSolrClient());
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish(collection, cluster.getSolrClient().getZkStateReader(),
false, true, TIMEOUT);
}
@@ -99,7 +108,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
public void cleanIndex() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
}
@Before
@@ -200,10 +209,10 @@ public class JDBCStreamTest extends SolrCloudTestCase {
new UpdateRequest()
.add(id, "0", "code_s", "GB", "name_s", "Great Britian")
.add(id, "1", "code_s", "CA", "name_s", "Canada")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class);
List<Tuple> tuples;
@@ -211,7 +220,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
// Simple 1
TupleStream jdbcStream = new JDBCStream("jdbc:hsqldb:mem:.", "select CODE,COUNTRY_NAME from COUNTRIES order by CODE", new FieldComparator("CODE", ComparatorOrder.ASCENDING));
TupleStream selectStream = new SelectStream(jdbcStream, new HashMap<String, String>(){{ put("CODE", "code_s"); put("COUNTRY_NAME", "name_s"); }});
- TupleStream searchStream = factory.constructStream("search(" + COLLECTION + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
+ TupleStream searchStream = factory.constructStream("search(" + COLLECTIONORALIAS + ", fl=\"code_s,name_s\",q=\"*:*\",sort=\"code_s asc\")");
TupleStream mergeStream = new MergeStream(new FieldComparator("code_s", ComparatorOrder.ASCENDING), new TupleStream[]{selectStream,searchStream});
tuples = getTuples(mergeStream);
@@ -225,7 +234,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
public void testJDBCSolrInnerJoinExpression() throws Exception{
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
@@ -262,7 +271,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
.add(id, "8", "rating_f", "4", "personId_i", "18")
.add(id, "9", "rating_f", "4.1", "personId_i", "19")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expression;
TupleStream stream;
@@ -272,7 +281,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
expression =
"innerJoin("
+ " select("
- + " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
@@ -299,7 +308,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
public void testJDBCSolrInnerJoinExpressionWithProperties() throws Exception{
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
@@ -336,7 +345,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
.add(id, "8", "rating_f", "4", "personId_i", "18")
.add(id, "9", "rating_f", "4.1", "personId_i", "19")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expression;
TupleStream stream;
@@ -349,7 +358,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
expression =
"innerJoin("
+ " select("
- + " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
@@ -378,7 +387,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
expression =
"innerJoin("
+ " select("
- + " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"
@@ -405,7 +414,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
public void testJDBCSolrInnerJoinRollupExpression() throws Exception{
StreamFactory factory = new StreamFactory()
- .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("hashJoin", HashJoinStream.class)
@@ -448,7 +457,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
.add(id, "6", "rating_f", "3", "personId_i", "16")
.add(id, "7", "rating_f", "3", "personId_i", "17")
.add(id, "10", "rating_f", "4.8", "personId_i", "20")
- .commit(cluster.getSolrClient(), COLLECTION);
+ .commit(cluster.getSolrClient(), COLLECTIONORALIAS);
String expression;
TupleStream stream;
@@ -459,7 +468,7 @@ public class JDBCStreamTest extends SolrCloudTestCase {
"rollup("
+ " hashJoin("
+ " hashed=select("
- + " search(" + COLLECTION + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ + " search(" + COLLECTIONORALIAS + ", fl=\"personId_i,rating_f\", q=\"rating_f:*\", sort=\"personId_i asc\"),"
+ " personId_i as personId,"
+ " rating_f as rating"
+ " ),"