You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/05/08 15:26:19 UTC
[02/50] [abbrv] lucene-solr:jira/solr-10262: SOLR-10536: stats
Streaming Expression should work in non-SolrCloud mode
SOLR-10536: stats Streaming Expression should work in non-SolrCloud mode
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b5e9b5aa
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b5e9b5aa
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b5e9b5aa
Branch: refs/heads/jira/solr-10262
Commit: b5e9b5aaf684951c3353959918bb0796a824a12e
Parents: 5d42177
Author: Joel Bernstein <jb...@apache.org>
Authored: Tue May 2 15:44:15 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Tue May 2 20:34:28 2017 -0400
----------------------------------------------------------------------
.../client/solrj/io/stream/StatsStream.java | 65 +++++---
.../solrj/io/stream/StreamExpressionTest.java | 150 +++++++++++++++----
2 files changed, 172 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b5e9b5aa/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
index f6b5818..cb46db4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java
@@ -26,7 +26,7 @@ import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -61,6 +61,7 @@ public class StatsStream extends TupleStream implements Expressible {
private Map<String, Metric> metricMap;
protected transient SolrClientCache cache;
protected transient CloudSolrClient cloudSolrClient;
+ protected StreamContext streamContext;
// Use StatsStream(String, String, SolrParams, Metric[]
@Deprecated
@@ -129,9 +130,12 @@ public class StatsStream extends TupleStream implements Expressible {
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
}
+
+ /*
if(null == zkHost){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
}
+ */
// metrics, optional - if not provided then why are you using this?
Metric[] metrics = new Metric[metricExpressions.size()];
@@ -195,6 +199,7 @@ public class StatsStream extends TupleStream implements Expressible {
}
public void setStreamContext(StreamContext context) {
+ streamContext = context;
cache = context.getSolrClientCache();
}
@@ -203,32 +208,56 @@ public class StatsStream extends TupleStream implements Expressible {
}
public void open() throws IOException {
- if(cache != null) {
- cloudSolrClient = cache.getCloudSolrClient(zkHost);
- } else {
- cloudSolrClient = new Builder()
- .withZkHost(zkHost)
- .build();
- }
-
ModifiableSolrParams paramsLoc = new ModifiableSolrParams(this.params);
addStats(paramsLoc, metrics);
paramsLoc.set("stats", "true");
paramsLoc.set("rows", "0");
- QueryRequest request = new QueryRequest(paramsLoc);
- try {
- NamedList response = cloudSolrClient.request(request, collection);
- this.tuple = getTuple(response);
- } catch (Exception e) {
- throw new IOException(e);
+ Map<String, List<String>> shardsMap = (Map<String, List<String>>)streamContext.get("shards");
+ if(shardsMap == null) {
+ QueryRequest request = new QueryRequest(paramsLoc);
+ CloudSolrClient cloudSolrClient = cache.getCloudSolrClient(zkHost);
+ try {
+ NamedList response = cloudSolrClient.request(request, collection);
+ this.tuple = getTuple(response);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ } else {
+ List<String> shards = shardsMap.get(collection);
+ HttpSolrClient client = cache.getHttpSolrClient(shards.get(0));
+
+ if(shards.size() > 1) {
+ String shardsParam = getShardString(shards);
+ paramsLoc.add("shards", shardsParam);
+ paramsLoc.add("distrib", "true");
+ }
+
+ QueryRequest request = new QueryRequest(paramsLoc);
+ try {
+ NamedList response = client.request(request);
+ this.tuple = getTuple(response);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
}
- public void close() throws IOException {
- if(cache == null) {
- cloudSolrClient.close();
+ private String getShardString(List<String> shards) {
+ StringBuilder builder = new StringBuilder();
+ for(String shard : shards) {
+ if(builder.length() > 0) {
+ builder.append(",");
+ }
+ builder.append(shard);
}
+ return builder.toString();
+ }
+
+
+
+ public void close() throws IOException {
+
}
public Tuple read() throws IOException {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b5e9b5aa/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
index 006c589..9ea8ca4 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java
@@ -1695,38 +1695,138 @@ public class StreamExpressionTest extends SolrCloudTestCase {
StreamExpression expression;
TupleStream stream;
List<Tuple> tuples;
-
- expression = StreamExpressionParser.parse("stats(collection1, q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), count(*))");
- stream = factory.constructStream(expression);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache cache = new SolrClientCache();
+ try {
+ streamContext.setSolrClientCache(cache);
+ String expr = "stats(" + COLLECTIONORALIAS + ", q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), count(*))";
+ expression = StreamExpressionParser.parse(expr);
+ stream = factory.constructStream(expression);
+ stream.setStreamContext(streamContext);
- tuples = getTuples(stream);
+ tuples = getTuples(stream);
- assert(tuples.size() == 1);
+ assert (tuples.size() == 1);
- //Test Long and Double Sums
+ //Test Long and Double Sums
- Tuple tuple = tuples.get(0);
+ Tuple tuple = tuples.get(0);
- Double sumi = tuple.getDouble("sum(a_i)");
- Double sumf = tuple.getDouble("sum(a_f)");
- Double mini = tuple.getDouble("min(a_i)");
- Double minf = tuple.getDouble("min(a_f)");
- Double maxi = tuple.getDouble("max(a_i)");
- Double maxf = tuple.getDouble("max(a_f)");
- Double avgi = tuple.getDouble("avg(a_i)");
- Double avgf = tuple.getDouble("avg(a_f)");
- Double count = tuple.getDouble("count(*)");
+ Double sumi = tuple.getDouble("sum(a_i)");
+ Double sumf = tuple.getDouble("sum(a_f)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double minf = tuple.getDouble("min(a_f)");
+ Double maxi = tuple.getDouble("max(a_i)");
+ Double maxf = tuple.getDouble("max(a_f)");
+ Double avgi = tuple.getDouble("avg(a_i)");
+ Double avgf = tuple.getDouble("avg(a_f)");
+ Double count = tuple.getDouble("count(*)");
+
+ assertTrue(sumi.longValue() == 70);
+ assertTrue(sumf.doubleValue() == 55.0D);
+ assertTrue(mini.doubleValue() == 0.0D);
+ assertTrue(minf.doubleValue() == 1.0D);
+ assertTrue(maxi.doubleValue() == 14.0D);
+ assertTrue(maxf.doubleValue() == 10.0D);
+ assertTrue(avgi.doubleValue() == 7.0D);
+ assertTrue(avgf.doubleValue() == 5.5D);
+ assertTrue(count.doubleValue() == 10);
- assertTrue(sumi.longValue() == 70);
- assertTrue(sumf.doubleValue() == 55.0D);
- assertTrue(mini.doubleValue() == 0.0D);
- assertTrue(minf.doubleValue() == 1.0D);
- assertTrue(maxi.doubleValue() == 14.0D);
- assertTrue(maxf.doubleValue() == 10.0D);
- assertTrue(avgi.doubleValue() == 7.0D);
- assertTrue(avgf.doubleValue() == 5.5D);
- assertTrue(count.doubleValue() == 10);
+ //Test with shards parameter
+ List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
+ expr = "stats(myCollection, q=*:*, sum(a_i), sum(a_f), min(a_i), min(a_f), max(a_i), max(a_f), avg(a_i), avg(a_f), count(*))";
+ Map<String, List<String>> shardsMap = new HashMap();
+ shardsMap.put("myCollection", shardUrls);
+ StreamContext context = new StreamContext();
+ context.put("shards", shardsMap);
+ context.setSolrClientCache(cache);
+ stream = factory.constructStream(expr);
+ stream.setStreamContext(context);
+
+ tuples = getTuples(stream);
+
+ assert (tuples.size() == 1);
+
+ //Test Long and Double Sums
+
+ tuple = tuples.get(0);
+
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(sumi.longValue() == 70);
+ assertTrue(sumf.doubleValue() == 55.0D);
+ assertTrue(mini.doubleValue() == 0.0D);
+ assertTrue(minf.doubleValue() == 1.0D);
+ assertTrue(maxi.doubleValue() == 14.0D);
+ assertTrue(maxf.doubleValue() == 10.0D);
+ assertTrue(avgi.doubleValue() == 7.0D);
+ assertTrue(avgf.doubleValue() == 5.5D);
+ assertTrue(count.doubleValue() == 10);
+
+ //Execersise the /stream hander
+
+ //Add the shards http parameter for the myCollection
+ StringBuilder buf = new StringBuilder();
+ for (String shardUrl : shardUrls) {
+ if (buf.length() > 0) {
+ buf.append(",");
+ }
+ buf.append(shardUrl);
+ }
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.add("qt", "/stream");
+ solrParams.add("expr", expr);
+ solrParams.add("myCollection.shards", buf.toString());
+ SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
+ tuples = getTuples(solrStream);
+ assert (tuples.size() == 1);
+
+ tuple =tuples.get(0);
+
+ sumi = tuple.getDouble("sum(a_i)");
+ sumf = tuple.getDouble("sum(a_f)");
+ mini = tuple.getDouble("min(a_i)");
+ minf = tuple.getDouble("min(a_f)");
+ maxi = tuple.getDouble("max(a_i)");
+ maxf = tuple.getDouble("max(a_f)");
+ avgi = tuple.getDouble("avg(a_i)");
+ avgf = tuple.getDouble("avg(a_f)");
+ count = tuple.getDouble("count(*)");
+
+ assertTrue(sumi.longValue() == 70);
+ assertTrue(sumf.doubleValue() == 55.0D);
+ assertTrue(mini.doubleValue() == 0.0D);
+ assertTrue(minf.doubleValue() == 1.0D);
+ assertTrue(maxi.doubleValue() == 14.0D);
+ assertTrue(maxf.doubleValue() == 10.0D);
+ assertTrue(avgi.doubleValue() == 7.0D);
+ assertTrue(avgf.doubleValue() == 5.5D);
+ assertTrue(count.doubleValue() == 10);
+ //Add a negative test to prove that it cannot find slices if shards parameter is removed
+
+ try {
+ ModifiableSolrParams solrParamsBad = new ModifiableSolrParams();
+ solrParamsBad.add("qt", "/stream");
+ solrParamsBad.add("expr", expr);
+ solrStream = new SolrStream(shardUrls.get(0), solrParamsBad);
+ tuples = getTuples(solrStream);
+ throw new Exception("Exception should have been thrown above");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Collection not found: myCollection"));
+ }
+ } finally {
+ cache.close();
+ }
}
@Test