You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by kr...@apache.org on 2016/04/28 17:38:45 UTC
[1/2] lucene-solr:jira/solr-8593: First commit for Calcite SQLHandler
integration
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-8593 3e6de6059 -> b08a46363
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
new file mode 100644
index 0000000..c84548f
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
@@ -0,0 +1,105 @@
+package org.apache.solr.handler.sql;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.LukeRequest;
+import org.apache.solr.client.solrj.response.LukeResponse;
+import org.apache.solr.common.luke.FieldFlag;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+class SolrSchema extends AbstractSchema {
+ final CloudSolrClient cloudSolrClient;
+
+ SolrSchema(String zk) {
+ super();
+ this.cloudSolrClient = new CloudSolrClient(zk);
+ this.cloudSolrClient.connect();
+ }
+
+ @Override
+ protected Map<String, Table> getTableMap() {
+ this.cloudSolrClient.connect();
+ Set<String> collections = this.cloudSolrClient.getZkStateReader().getClusterState().getCollections();
+ final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
+ for (String collection : collections) {
+ builder.put(collection, new SolrTable(this, collection));
+ }
+ return builder.build();
+ }
+
+ private Map<String, LukeResponse.FieldInfo> getFieldInfo(String collection) {
+ LukeRequest lukeRequest = new LukeRequest();
+ lukeRequest.setNumTerms(0);
+ LukeResponse lukeResponse;
+ try {
+ lukeResponse = lukeRequest.process(cloudSolrClient, collection);
+ } catch (SolrServerException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ return lukeResponse.getFieldInfo();
+ }
+
+ RelProtoDataType getRelDataType(String collection) {
+ // Temporary type factory, just for the duration of this method. Allowable
+ // because we're creating a proto-type, not a type; before being used, the
+ // proto-type will be copied into a real type factory.
+ final RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ final RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder();
+ Map<String, LukeResponse.FieldInfo> luceneFieldInfoMap = getFieldInfo(collection);
+ for(Map.Entry<String, LukeResponse.FieldInfo> entry : luceneFieldInfoMap.entrySet()) {
+ LukeResponse.FieldInfo luceneFieldInfo = entry.getValue();
+
+ RelDataType type;
+ switch (luceneFieldInfo.getType()) {
+ case "string":
+ type = typeFactory.createJavaType(String.class);
+ break;
+ case "int":
+ case "long":
+ type = typeFactory.createJavaType(Long.class);
+ break;
+ default:
+ type = typeFactory.createJavaType(String.class);
+ }
+
+ EnumSet<FieldFlag> flags = luceneFieldInfo.getFlags();
+ if(flags != null && flags.contains(FieldFlag.MULTI_VALUED)) {
+ type = typeFactory.createArrayType(type, -1);
+ }
+
+ fieldInfo.add(entry.getKey(), type).nullable(true);
+ }
+
+ return RelDataTypeImpl.proto(fieldInfo.build());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrSchemaFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrSchemaFactory.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrSchemaFactory.java
new file mode 100644
index 0000000..f0a6ba0
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrSchemaFactory.java
@@ -0,0 +1,35 @@
+package org.apache.solr.handler.sql;
+
+import java.util.Map;
+
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@SuppressWarnings("UnusedDeclaration")
+public class SolrSchemaFactory implements SchemaFactory {
+ public SolrSchemaFactory() {
+ }
+
+ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
+ final String zk = (String) operand.get("zk");
+ return new SolrSchema(zk);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrSort.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrSort.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrSort.java
new file mode 100644
index 0000000..0945984
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrSort.java
@@ -0,0 +1,85 @@
+package org.apache.solr.handler.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Sort} relational expression in Solr.
+ */
+public class SolrSort extends Sort implements SolrRel {
+ private final RelCollation implicitCollation;
+
+ public SolrSort(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RelCollation collation,
+ RelCollation implicitCollation, RexNode fetch) {
+ super(cluster, traitSet, child, collation, null, fetch);
+
+ this.implicitCollation = implicitCollation;
+
+ assert getConvention() == SolrRel.CONVENTION;
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.05);
+ }
+
+ @Override
+ public Sort copy(RelTraitSet traitSet, RelNode input, RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new SolrSort(getCluster(), traitSet, input, collation, implicitCollation, fetch);
+ }
+
+ public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+
+ List<RelFieldCollation> sortCollations = collation.getFieldCollations();
+ List<String> fieldOrder = new ArrayList<>();
+ if (!sortCollations.isEmpty()) {
+ // Construct a series of order clauses from the desired collation
+ final List<RelDataTypeField> fields = getRowType().getFieldList();
+ for (RelFieldCollation fieldCollation : sortCollations) {
+ final String name = fields.get(fieldCollation.getFieldIndex()).getName();
+ String direction = "ASC";
+ if (fieldCollation.getDirection().equals(RelFieldCollation.Direction.DESCENDING)) {
+ direction = "DESC";
+ }
+ fieldOrder.add(name + " " + direction);
+ }
+
+ implementor.addOrder(fieldOrder);
+ }
+ if (fetch != null) {
+ implementor.setLimit(((RexLiteral) fetch).getValue().toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
new file mode 100644
index 0000000..c6838eb
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
@@ -0,0 +1,171 @@
+package org.apache.solr.handler.sql;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.calcite.adapter.java.AbstractQueryableTable;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTableQueryable;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.common.params.CommonParams;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Table based on a Solr collection
+ */
+public class SolrTable extends AbstractQueryableTable implements TranslatableTable {
+ private final String collection;
+ private final SolrSchema schema;
+ private RelProtoDataType protoRowType;
+
+ public SolrTable(SolrSchema schema, String collection) {
+ super(Object[].class);
+ this.schema = schema;
+ this.collection = collection;
+ }
+
+ public String toString() {
+ return "SolrTable {" + collection + "}";
+ }
+
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ if (protoRowType == null) {
+ protoRowType = schema.getRelDataType(collection);
+ }
+ return protoRowType.apply(typeFactory);
+ }
+
+ public Enumerable<Object> query(final CloudSolrClient cloudSolrClient) {
+ return query(cloudSolrClient, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), null);
+ }
+
+ /** Executes a Solr query on the underlying table.
+ *
+ * @param cloudSolrClient Solr CloudSolrClient
+ * @param fields List of fields to project
+ * @param filterQueries A list of filterQueries which should be used in the query
+ * @return Enumerator of results
+ */
+ public Enumerable<Object> query(final CloudSolrClient cloudSolrClient, List<String> fields,
+ List<String> filterQueries, List<String> order, String limit) {
+ Map<String, String> solrParams = new HashMap<>();
+ solrParams.put(CommonParams.Q, "*:*");
+ //solrParams.put(CommonParams.QT, "/export");
+
+ if (fields.isEmpty()) {
+ solrParams.put(CommonParams.FL, "*");
+ } else {
+ solrParams.put(CommonParams.FL, String.join(",", fields));
+ }
+
+ if (filterQueries.isEmpty()) {
+ solrParams.put(CommonParams.FQ, "*:*");
+ } else {
+ // SolrParams should be a ModifiableParams instead of a map so we could add multiple FQs
+ solrParams.put(CommonParams.FQ, String.join(" OR ", filterQueries));
+ }
+
+ // Build and issue the query and return an Enumerator over the results
+ if (order.isEmpty()) {
+ String DEFAULT_SORT_FIELD = "_version_";
+ solrParams.put(CommonParams.SORT, DEFAULT_SORT_FIELD + " desc");
+
+ // Make sure the default sort field is in the field list
+ String fl = solrParams.get(CommonParams.FL);
+ if(!fl.contains(DEFAULT_SORT_FIELD)) {
+ solrParams.put(CommonParams.FL, String.join(",", fl, DEFAULT_SORT_FIELD));
+ }
+ } else {
+ solrParams.put(CommonParams.SORT, String.join(",", order));
+ }
+
+// if (limit != null) {
+// queryBuilder.append(" LIMIT ").append(limit);
+// }
+
+ return new AbstractEnumerable<Object>() {
+ public Enumerator<Object> enumerator() {
+ TupleStream cloudSolrStream;
+ try {
+ cloudSolrStream = new CloudSolrStream(cloudSolrClient.getZkHost(), collection, solrParams);
+ cloudSolrStream.open();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return new SolrEnumerator(cloudSolrStream, fields);
+ }
+ };
+ }
+
+ public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, String tableName) {
+ return new SolrQueryable<>(queryProvider, schema, this, tableName);
+ }
+
+ public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
+ final RelOptCluster cluster = context.getCluster();
+ return new SolrTableScan(cluster, cluster.traitSetOf(SolrRel.CONVENTION), relOptTable, this, null);
+ }
+
+ public static class SolrQueryable<T> extends AbstractTableQueryable<T> {
+ SolrQueryable(QueryProvider queryProvider, SchemaPlus schema, SolrTable table, String tableName) {
+ super(queryProvider, schema, table, tableName);
+ }
+
+ public Enumerator<T> enumerator() {
+ //noinspection unchecked
+ final Enumerable<T> enumerable = (Enumerable<T>) getTable().query(getCloudSolrClient());
+ return enumerable.enumerator();
+ }
+
+ private SolrTable getTable() {
+ return (SolrTable) table;
+ }
+
+ private CloudSolrClient getCloudSolrClient() {
+ return schema.unwrap(SolrSchema.class).cloudSolrClient;
+ }
+
+ /** Called via code-generation.
+ *
+ * @see SolrMethod#SOLR_QUERYABLE_QUERY
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public Enumerable<Object> query(List<String> fields, List<String> filterQueries, List<String> order, String limit) {
+ return getTable().query(getCloudSolrClient(), fields, filterQueries, order, limit);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrTableScan.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrTableScan.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrTableScan.java
new file mode 100644
index 0000000..4655a01
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrTableScan.java
@@ -0,0 +1,80 @@
+package org.apache.solr.handler.sql;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Relational expression representing a scan of a Solr collection.
+ */
+class SolrTableScan extends TableScan implements SolrRel {
+ final SolrTable solrTable;
+ final RelDataType projectRowType;
+
+ /**
+ * Creates a SolrTableScan.
+ *
+ * @param cluster Cluster
+ * @param traitSet Traits
+ * @param table Table
+ * @param solrTable Solr table
+ * @param projectRowType Fields and types to project; null to project raw row
+ */
+ SolrTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table, SolrTable solrTable,
+ RelDataType projectRowType) {
+ super(cluster, traitSet, table);
+ this.solrTable = solrTable;
+ this.projectRowType = projectRowType;
+
+ assert solrTable != null;
+ assert getConvention() == SolrRel.CONVENTION;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ assert inputs.isEmpty();
+ return this;
+ }
+
+ @Override
+ public RelDataType deriveRowType() {
+ return projectRowType != null ? projectRowType : super.deriveRowType();
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+ planner.addRule(SolrToEnumerableConverterRule.INSTANCE);
+ for (RelOptRule rule : SolrRules.RULES) {
+ planner.addRule(rule);
+ }
+ }
+
+ public void implement(Implementor implementor) {
+ implementor.solrTable = solrTable;
+ implementor.table = table;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java
new file mode 100644
index 0000000..6ee7908
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverter.java
@@ -0,0 +1,116 @@
+package org.apache.solr.handler.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MethodCallExpression;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalcitePrepareImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterImpl;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.util.BuiltInMethod;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Relational expression representing a scan of a table in Solr
+ */
+public class SolrToEnumerableConverter extends ConverterImpl implements EnumerableRel {
+ protected SolrToEnumerableConverter(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+ super(cluster, ConventionTraitDef.INSTANCE, traits, input);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new SolrToEnumerableConverter(getCluster(), traitSet, sole(inputs));
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(.1);
+ }
+
+ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
+ // Generates a call to "query" with the appropriate fields and filterQueries
+ final BlockBuilder list = new BlockBuilder();
+ final SolrRel.Implementor solrImplementor = new SolrRel.Implementor();
+ solrImplementor.visitChild(0, getInput());
+ final RelDataType rowType = getRowType();
+ final PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), rowType, pref.prefer(JavaRowFormat.ARRAY));
+ final Expression table = list.append("table", solrImplementor.table.getExpression(SolrTable.SolrQueryable.class));
+ final Expression fields = list.append("fields",
+ constantArrayList(generateFields(SolrRules.solrFieldNames(rowType), solrImplementor.fieldMappings), String.class));
+ final Expression filterQueries = list.append("filterQueries", constantArrayList(solrImplementor.filterQueries, String.class));
+ final Expression order = list.append("order", constantArrayList(solrImplementor.order, String.class));
+ final Expression limit = list.append("limit", Expressions.constant(solrImplementor.limitValue));
+ Expression enumerable = list.append("enumerable", Expressions.call(table, SolrMethod.SOLR_QUERYABLE_QUERY.method,
+ fields, filterQueries, order, limit));
+ if (CalcitePrepareImpl.DEBUG) {
+ System.out.println("Solr: " + filterQueries);
+ }
+ Hook.QUERY_PLAN.run(filterQueries);
+ list.add(Expressions.return_(null, enumerable));
+ return implementor.result(physType, list.toBlock());
+ }
+
+ private List<String> generateFields(List<String> queryFields, Map<String, String> fieldMappings) {
+ if(fieldMappings.isEmpty()) {
+ return queryFields;
+ } else {
+ List<String> fields = new ArrayList<>();
+ for(String field : queryFields) {
+ fields.add(fieldMappings.getOrDefault(field, field));
+ }
+ return fields;
+ }
+ }
+
+ /**
+ * E.g. {@code constantArrayList("x", "y")} returns
+ * "Arrays.asList('x', 'y')".
+ */
+ private static <T> MethodCallExpression constantArrayList(List<T> values, Class clazz) {
+ return Expressions.call(BuiltInMethod.ARRAYS_AS_LIST.method,
+ Expressions.newArrayInit(clazz, constantList(values)));
+ }
+
+ /**
+ * E.g. {@code constantList("x", "y")} returns
+ * {@code {ConstantExpression("x"), ConstantExpression("y")}}.
+ */
+ private static <T> List<Expression> constantList(List<T> values) {
+ return Lists.transform(values, Expressions::constant);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverterRule.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverterRule.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverterRule.java
new file mode 100644
index 0000000..a97e047
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrToEnumerableConverterRule.java
@@ -0,0 +1,40 @@
+package org.apache.solr.handler.sql;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Rule to convert a relational expression from {@link SolrRel#CONVENTION} to {@link EnumerableConvention}.
+ */
+class SolrToEnumerableConverterRule extends ConverterRule {
+ static final ConverterRule INSTANCE = new SolrToEnumerableConverterRule();
+
+ private SolrToEnumerableConverterRule() {
+ super(RelNode.class, SolrRel.CONVENTION, EnumerableConvention.INSTANCE, "SolrToEnumerableConverterRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention());
+ return new SolrToEnumerableConverter(rel.getCluster(), newTraitSet, rel);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
index 893b6fe..26975cd 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
@@ -18,23 +18,20 @@ package org.apache.solr.handler;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.facebook.presto.sql.parser.SqlParser;
-import com.facebook.presto.sql.tree.Statement;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.SolrStream;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
-import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.params.CommonParams;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestSQLHandler extends AbstractFullDistribZkTestBase {
@@ -86,134 +83,19 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
@Test
public void doTest() throws Exception {
waitForRecoveriesToFinish(false);
- testPredicate();
testBasicSelect();
testMixedCaseFields();
- testBasicGrouping();
- testBasicGroupingFacets();
- testSelectDistinct();
- testSelectDistinctFacets();
- testAggregatesWithoutGrouping();
- testSQLException();
- testTimeSeriesGrouping();
- testTimeSeriesGroupingFacet();
- testParallelBasicGrouping();
- testParallelSelectDistinct();
- testParallelTimeSeriesGrouping();
- testCatalogStream();
- testSchemasStream();
- testTablesStream();
- }
-
- private void testPredicate() throws Exception {
-
- SqlParser parser = new SqlParser();
- String sql = "select a from b where c = 'd'";
- Statement statement = parser.createStatement(sql);
- SQLHandler.SQLVisitor sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
-
- assert(sqlVistor.query.equals("(c:\"d\")"));
-
- //Add parens
- parser = new SqlParser();
- sql = "select a from b where (c = 'd')";
- statement = parser.createStatement(sql);
- sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
-
- assert(sqlVistor.query.equals("(c:\"d\")"));
-
-
- //Upper case
- parser = new SqlParser();
- sql = "select a from b where ('CcC' = 'D')";
- statement = parser.createStatement(sql);
- sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
- assert(sqlVistor.query.equals("(CcC:\"D\")"));
-
- //Phrase
- parser = new SqlParser();
- sql = "select a from b where (c = 'd d')";
- statement = parser.createStatement(sql);
- sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
-
- assert(sqlVistor.query.equals("(c:\"d d\")"));
-
- // AND
- parser = new SqlParser();
- sql = "select a from b where ((c = 'd') AND (l = 'z'))";
- statement = parser.createStatement(sql);
- sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
-
- assert(sqlVistor.query.equals("((c:\"d\") AND (l:\"z\"))"));
-
- // OR
-
- parser = new SqlParser();
- sql = "select a from b where ((c = 'd') OR (l = 'z'))";
- statement = parser.createStatement(sql);
- sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
-
- assert(sqlVistor.query.equals("((c:\"d\") OR (l:\"z\"))"));
-
- // AND NOT
-
- parser = new SqlParser();
- sql = "select a from b where ((c = 'd') AND NOT (l = 'z'))";
- statement = parser.createStatement(sql);
- sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
-
- assert(sqlVistor.query.equals("((c:\"d\") AND -(l:\"z\"))"));
-
- // NESTED
- parser = new SqlParser();
- sql = "select a from b where ((c = 'd') OR ((l = 'z') AND (m = 'j')))";
- statement = parser.createStatement(sql);
- sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
-
- assert(sqlVistor.query.equals("((c:\"d\") OR ((l:\"z\") AND (m:\"j\")))"));
-
- // NESTED NOT
- parser = new SqlParser();
- sql = "select a from b where ((c = 'd') OR ((l = 'z') AND NOT (m = 'j')))";
- statement = parser.createStatement(sql);
- sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
-
- assert(sqlVistor.query.equals("((c:\"d\") OR ((l:\"z\") AND -(m:\"j\")))"));
-
- // RANGE - Will have to do until SQL BETWEEN is supported.
- // NESTED
- parser = new SqlParser();
- sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z)') AND (m = 'j')))";
- statement = parser.createStatement(sql);
- sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
-
- assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z)) AND (m:\"j\")))"));
-
- // Wildcard
- parser = new SqlParser();
- sql = "select a from b where ((c = '[0 TO 100]') OR ((l = '(z*)') AND (m = 'j')))";
- statement = parser.createStatement(sql);
- sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
- assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (m:\"j\")))"));
-
- // Complex Lucene/Solr Query
- parser = new SqlParser();
- sql = "select a from b where (('c' = '[0 TO 100]') OR ((l = '(z*)') AND ('M' = '(j OR (k NOT s))')))";
- statement = parser.createStatement(sql);
- sqlVistor = new SQLHandler.SQLVisitor(new StringBuilder());
- sqlVistor.process(statement, new Integer(0));
- assert(sqlVistor.query.equals("((c:[0 TO 100]) OR ((l:(z*)) AND (M:(j OR (k NOT s)))))"));
+// testBasicGrouping();
+// testBasicGroupingFacets();
+// testSelectDistinct();
+// testSelectDistinctFacets();
+// testAggregatesWithoutGrouping();
+// testSQLException();
+// testTimeSeriesGrouping();
+// testTimeSeriesGroupingFacet();
+// testParallelBasicGrouping();
+// testParallelSelectDistinct();
+// testParallelTimeSeriesGrouping();
}
private void testBasicSelect() throws Exception {
@@ -234,9 +116,10 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
indexDoc(sdoc("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50"));
indexDoc(sdoc("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60"));
commit();
+
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
- params.put("stmt", "select 'id', field_i, str_s from collection1 where 'text'='XXXX' order by field_i desc");
+ params.put("stmt", "select id, field_i, str_s from collection1 where text='XXXX' order by field_i desc");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
@@ -290,7 +173,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params.put(CommonParams.QT, "/sql");
//Test unlimited unsorted result. Should sort on _version_ desc
- params.put("stmt", "select 'id', field_i, str_s from collection1 where 'text'='XXXX'");
+ params.put("stmt", "select id, field_i, str_s from collection1 where text='XXXX'");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@@ -424,14 +307,11 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
assert(tuple.getLong("myId") == 1);
assert(tuple.getLong("myInt") == 7);
assert(tuple.get("myString").equals("a"));
-
-
} finally {
delete();
}
}
-
private void testMixedCaseFields() throws Exception {
try {
@@ -452,7 +332,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
commit();
Map params = new HashMap();
params.put(CommonParams.QT, "/sql");
- params.put("stmt", "select id, Field_i, Str_s from Collection1 where Text_t='XXXX' order by Field_i desc");
+ params.put("stmt", "select id, Field_i, Str_s from collection1 where Text_t='XXXX' order by Field_i desc");
SolrStream solrStream = new SolrStream(jetty.url, params);
List<Tuple> tuples = getTuples(solrStream);
@@ -503,7 +383,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
- params.put("stmt", "select Str_s, sum(Field_i) from Collection1 where 'id'='(1 8)' group by Str_s having (sum(Field_i) = 7 OR 'sum(Field_i)' = 60) order by 'sum(Field_i)' desc");
+ params.put("stmt", "select Str_s, sum(Field_i) as `sum(Field_i)` from collection1 where id='(1 8)' group by Str_s having (sum(Field_i) = 7 OR sum(Field_i) = 60) order by sum(Field_i) desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@@ -520,7 +400,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
params = new HashMap();
params.put(CommonParams.QT, "/sql");
- params.put("stmt", "select Str_s, sum(Field_i) from Collection1 where 'id'='(1 8)' group by 'Str_s' having (sum(Field_i) = 7 OR 'sum(Field_i)' = 60) order by 'sum(Field_i)' desc");
+ params.put("stmt", "select Str_s, sum(Field_i) as `sum(Field_i)` from collection1 where id='(1 8)' group by Str_s having (sum(Field_i) = 7 OR sum(Field_i) = 60) order by sum(Field_i) desc");
solrStream = new SolrStream(jetty.url, params);
tuples = getTuples(solrStream);
@@ -2422,74 +2302,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
}
}
- private void testCatalogStream() throws Exception {
- CloudJettyRunner jetty = this.cloudJettys.get(0);
-
- Map<String, Object> params = new HashMap<>();
- params.put(CommonParams.QT, "/sql");
- params.put("numWorkers", 2);
- params.put("stmt", "select TABLE_CAT from _CATALOGS_");
-
- SolrStream solrStream = new SolrStream(jetty.url, params);
- List<Tuple> tuples = getTuples(solrStream);
-
- assertEquals(tuples.size(), 1);
- assertEquals(tuples.get(0).getString("TABLE_CAT"), zkServer.getZkAddress());
- }
-
- private void testSchemasStream() throws Exception {
- CloudJettyRunner jetty = this.cloudJettys.get(0);
-
- Map<String, Object> params = new HashMap<>();
- params.put(CommonParams.QT, "/sql");
- params.put("numWorkers", 2);
- params.put("stmt", "select TABLE_SCHEM, TABLE_CATALOG from _SCHEMAS_");
-
- SolrStream solrStream = new SolrStream(jetty.url, params);
- List<Tuple> tuples = getTuples(solrStream);
-
- assertEquals(tuples.size(), 0);
- }
-
- private void testTablesStream() throws Exception {
- CloudJettyRunner jetty = this.cloudJettys.get(0);
-
- Map<String, Object> params = new HashMap<>();
- params.put(CommonParams.QT, "/sql");
- params.put("numWorkers", 2);
- params.put("stmt", "select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, REMARKS from _TABLES_");
-
- SolrStream solrStream = new SolrStream(jetty.url, params);
- List<Tuple> tuples = getTuples(solrStream);
-
- assertEquals(2, tuples.size());
-
- List<String> collections = new ArrayList<>();
- collections.addAll(cloudClient.getZkStateReader().getClusterState().getCollections());
- Collections.sort(collections);
- for (Tuple tuple : tuples) {
- assertEquals(zkServer.getZkAddress(), tuple.getString("TABLE_CAT"));
- assertNull(tuple.get("TABLE_SCHEM"));
- assertTrue(collections.contains(tuple.getString("TABLE_NAME")));
- assertEquals("TABLE", tuple.getString("TABLE_TYPE"));
- assertNull(tuple.get("REMARKS"));
- }
-
- tuples = getTuples(solrStream);
- assertEquals(2, tuples.size());
-
- collections = new ArrayList<>();
- collections.addAll(cloudClient.getZkStateReader().getClusterState().getCollections());
- Collections.sort(collections);
- for (Tuple tuple : tuples) {
- assertEquals(zkServer.getZkAddress(), tuple.getString("TABLE_CAT"));
- assertNull(tuple.get("TABLE_SCHEM"));
- assertTrue(collections.contains(tuple.getString("TABLE_NAME")));
- assertEquals("TABLE", tuple.getString("TABLE_TYPE"));
- assertNull(tuple.get("REMARKS"));
- }
- }
-
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open();
List<Tuple> tuples = new ArrayList();
[2/2] lucene-solr:jira/solr-8593: First commit for Calcite SQLHandler
integration
Posted by kr...@apache.org.
First commit for Calcite SQLHandler integration
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b08a4636
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b08a4636
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b08a4636
Branch: refs/heads/jira/solr-8593
Commit: b08a463639efe11b62be67324d638f572c7d668e
Parents: 3e6de60
Author: Kevin Risden <kr...@apache.org>
Authored: Thu Apr 28 10:34:13 2016 -0500
Committer: Kevin Risden <kr...@apache.org>
Committed: Thu Apr 28 10:34:13 2016 -0500
----------------------------------------------------------------------
solr/core/ivy.xml | 9 +-
.../org/apache/solr/handler/SQLHandler.java | 1621 +-----------------
.../apache/solr/handler/sql/SolrEnumerator.java | 90 +
.../org/apache/solr/handler/sql/SolrFilter.java | 170 ++
.../org/apache/solr/handler/sql/SolrMethod.java | 47 +
.../apache/solr/handler/sql/SolrProject.java | 69 +
.../org/apache/solr/handler/sql/SolrRel.java | 75 +
.../org/apache/solr/handler/sql/SolrRules.java | 246 +++
.../org/apache/solr/handler/sql/SolrSchema.java | 105 ++
.../solr/handler/sql/SolrSchemaFactory.java | 35 +
.../org/apache/solr/handler/sql/SolrSort.java | 85 +
.../org/apache/solr/handler/sql/SolrTable.java | 171 ++
.../apache/solr/handler/sql/SolrTableScan.java | 80 +
.../handler/sql/SolrToEnumerableConverter.java | 116 ++
.../sql/SolrToEnumerableConverterRule.java | 40 +
.../org/apache/solr/handler/TestSQLHandler.java | 224 +--
16 files changed, 1384 insertions(+), 1799 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/ivy.xml
----------------------------------------------------------------------
diff --git a/solr/core/ivy.xml b/solr/core/ivy.xml
index 5f8706f..a885b75 100644
--- a/solr/core/ivy.xml
+++ b/solr/core/ivy.xml
@@ -131,10 +131,13 @@
<!-- StatsComponents percentiles Dependencies-->
<dependency org="com.tdunning" name="t-digest" rev="${/com.tdunning/t-digest}" conf="compile->*"/>
- <!-- SQL Parser -->
- <dependency org="com.facebook.presto" name="presto-parser" rev="${/com.facebook.presto/presto-parser}"/>
- <dependency org="io.airlift" name="slice" rev="${/io.airlift/slice}"/>
+ <!-- SQL Parser -->
+ <dependency org="org.apache.calcite" name="calcite-core" rev="1.6.0"/>
+ <dependency org="org.apache.calcite" name="calcite-linq4j" rev="1.6.0"/>
+ <dependency org="net.hydromatic" name="eigenbase-properties" rev="1.1.5"/>
+ <dependency org="org.codehaus.janino" name="janino" rev="2.7.6"/>
+ <dependency org="org.codehaus.janino" name="commons-compiler" rev="2.7.6"/>
<exclude org="*" ext="*" matcher="regexp" type="${ivy.exclude.types}"/>
</dependencies>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
index c26c5a8..8a2f2e5 100644
--- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
@@ -16,77 +16,31 @@
*/
package org.apache.solr.handler;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Locale;
-import java.util.Set;
+import java.util.Properties;
-import com.facebook.presto.sql.tree.*;
-import com.google.common.base.Strings;
-import com.google.common.collect.Iterables;
-
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.io.Tuple;
-import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
-import org.apache.solr.client.solrj.io.comp.FieldComparator;
-import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
-import org.apache.solr.client.solrj.io.comp.StreamComparator;
-import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
-import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
-import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
-import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
-import org.apache.solr.client.solrj.io.stream.FacetStream;
-import org.apache.solr.client.solrj.io.stream.ParallelStream;
-import org.apache.solr.client.solrj.io.stream.RankStream;
-import org.apache.solr.client.solrj.io.stream.RollupStream;
-import org.apache.solr.client.solrj.io.stream.SelectStream;
-import org.apache.solr.client.solrj.io.stream.StatsStream;
-import org.apache.solr.client.solrj.io.stream.StreamContext;
-import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
-import org.apache.solr.client.solrj.io.stream.UniqueStream;
-import org.apache.solr.client.solrj.io.stream.expr.Explanation;
-import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
-import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
-import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
-import org.apache.solr.client.solrj.io.stream.metrics.*;
+import org.apache.solr.client.solrj.io.stream.JDBCStream;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.sql.SolrSchemaFactory;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.security.AuthorizationContext;
import org.apache.solr.security.PermissionNameProvider;
import org.apache.solr.util.plugin.SolrCoreAware;
-
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.facebook.presto.sql.parser.SqlParser;
-
public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , PermissionNameProvider {
private static String defaultZkhost = null;
private static String defaultWorkerCollection = null;
- private static List<String> remove;
-
- static {
- remove = new ArrayList();
- remove.add("count(*)");
- }
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -110,34 +64,40 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
params = adjustParams(params);
req.setParams(params);
String sql = params.get("stmt");
- int numWorkers = params.getInt("numWorkers", 1);
- String workerCollection = params.get("workerCollection", defaultWorkerCollection);
- String workerZkhost = params.get("workerZkhost",defaultZkhost);
- String mode = params.get("aggregationMode", "map_reduce");
- StreamContext context = new StreamContext();
-
- // JDBC driver requires metadata from the SQLHandler. Default to false since this adds a new Metadata stream.
- boolean includeMetadata = params.getBool("includeMetadata", false);
+ TupleStream tupleStream = null;
try {
-
if(sql == null) {
throw new Exception("stmt parameter cannot be null");
}
- context.setSolrClientCache(StreamHandler.clientCache);
-
- TupleStream tupleStream = SQLTupleStreamParser.parse(sql,
- numWorkers,
- workerCollection,
- workerZkhost,
- AggregationMode.getMode(mode),
- includeMetadata,
- context);
-
- rsp.add("result-set", new StreamHandler.TimerStream(new ExceptionStream(tupleStream)));
+ Properties info = new Properties();
+ info.setProperty("model",
+ "inline:{\n" +
+ " \"version\": \"1.0\",\n" +
+ " \"defaultSchema\": \"" + defaultZkhost + "\",\n" +
+ " \"schemas\": [\n" +
+ " {\n" +
+ " \"name\": \"" + defaultZkhost + "\",\n" +
+ " \"type\": \"custom\",\n" +
+ " \"factory\": \"" + SolrSchemaFactory.class.getName() + "\",\n" +
+ " \"operand\": {\n" +
+ " \"zk\": \"" + defaultZkhost + "\"\n" +
+ " }\n" +
+ " }\n" +
+ " ]\n" +
+ "}");
+ info.setProperty("lex", "MYSQL");
+
+ tupleStream = new StreamHandler.TimerStream(new ExceptionStream(
+ new JDBCStream("jdbc:calcite:", sql, null, info, null)));
+
+ rsp.add("result-set", tupleStream);
} catch(Exception e) {
//Catch the SQL parsing and query transformation exceptions.
+ if(tupleStream != null) {
+ tupleStream.close();
+ }
SolrException.log(logger, e);
rsp.add("result-set", new StreamHandler.DummyErrorStream(e));
}
@@ -157,1523 +117,4 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
public String getSource() {
return null;
}
-
- public static class SQLTupleStreamParser {
-
- public static TupleStream parse(String sql,
- int numWorkers,
- String workerCollection,
- String workerZkhost,
- AggregationMode aggregationMode,
- boolean includeMetadata,
- StreamContext context) throws IOException {
- SqlParser parser = new SqlParser();
- Statement statement = parser.createStatement(sql);
-
- SQLVisitor sqlVistor = new SQLVisitor(new StringBuilder());
-
- sqlVistor.process(statement, new Integer(0));
- sqlVistor.reverseAliases();
-
- TupleStream sqlStream = null;
-
- if(sqlVistor.table.toUpperCase(Locale.ROOT).contains("_CATALOGS_")) {
- sqlStream = new SelectStream(new CatalogsStream(defaultZkhost), sqlVistor.columnAliases);
- } else if(sqlVistor.table.toUpperCase(Locale.ROOT).contains("_SCHEMAS_")) {
- sqlStream = new SelectStream(new SchemasStream(defaultZkhost), sqlVistor.columnAliases);
- } else if(sqlVistor.table.toUpperCase(Locale.ROOT).contains("_TABLES_")) {
- sqlStream = new SelectStream(new TableStream(defaultZkhost), sqlVistor.columnAliases);
- } else if(sqlVistor.groupByQuery) {
- if(aggregationMode == AggregationMode.FACET) {
- sqlStream = doGroupByWithAggregatesFacets(sqlVistor);
- } else {
- context.numWorkers = numWorkers;
- sqlStream = doGroupByWithAggregates(sqlVistor, numWorkers, workerCollection, workerZkhost);
- }
- } else if(sqlVistor.isDistinct) {
- if(aggregationMode == AggregationMode.FACET) {
- sqlStream = doSelectDistinctFacets(sqlVistor);
- } else {
- context.numWorkers = numWorkers;
- sqlStream = doSelectDistinct(sqlVistor, numWorkers, workerCollection, workerZkhost);
- }
- } else {
- sqlStream = doSelect(sqlVistor);
- }
-
- if(includeMetadata) {
- sqlStream = new MetadataStream(sqlStream, sqlVistor);
- }
-
- sqlStream.setStreamContext(context);
- return sqlStream;
- }
- }
-
- private static TupleStream doGroupByWithAggregates(SQLVisitor sqlVisitor,
- int numWorkers,
- String workerCollection,
- String workerZkHost) throws IOException {
-
- Set<String> fieldSet = new HashSet();
- Bucket[] buckets = getBuckets(sqlVisitor.groupBy, fieldSet);
- Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
- if(metrics.length == 0) {
- throw new IOException("Group by queries must include atleast one aggregate function.");
- }
-
- String fl = fields(fieldSet);
- String sortDirection = getSortDirection(sqlVisitor.sorts);
- String sort = bucketSort(buckets, sortDirection);
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
- Map<String, String> params = new HashMap();
-
- params.put(CommonParams.FL, fl);
- params.put(CommonParams.Q, sqlVisitor.query);
- //Always use the /export handler for Group By Queries because it requires exporting full result sets.
- params.put(CommonParams.QT, "/export");
-
- if(numWorkers > 1) {
- params.put("partitionKeys", getPartitionKeys(buckets));
- }
-
- params.put("sort", sort);
-
- TupleStream tupleStream = null;
-
- CloudSolrStream cstream = new CloudSolrStream(zkHost, collection, params);
- tupleStream = new RollupStream(cstream, buckets, metrics);
-
- if(numWorkers > 1) {
- // Do the rollups in parallel
- // Maintain the sort of the Tuples coming from the workers.
- StreamComparator comp = bucketSortComp(buckets, sortDirection);
- ParallelStream parallelStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
-
- StreamFactory factory = new StreamFactory()
- .withFunctionName("search", CloudSolrStream.class)
- .withFunctionName("parallel", ParallelStream.class)
- .withFunctionName("rollup", RollupStream.class)
- .withFunctionName("sum", SumMetric.class)
- .withFunctionName("min", MinMetric.class)
- .withFunctionName("max", MaxMetric.class)
- .withFunctionName("avg", MeanMetric.class)
- .withFunctionName("count", CountMetric.class);
-
- parallelStream.setStreamFactory(factory);
- tupleStream = parallelStream;
- }
-
- //TODO: This should be done on the workers, but it won't serialize because it relies on Presto classes.
- // Once we make this a Expressionable the problem will be solved.
-
- if(sqlVisitor.havingExpression != null) {
- tupleStream = new HavingStream(tupleStream, sqlVisitor.havingExpression, sqlVisitor.reverseColumnAliases );
- }
-
- if(sqlVisitor.sorts != null && sqlVisitor.sorts.size() > 0) {
- if(!sortsEqual(buckets, sortDirection, sqlVisitor.sorts, sqlVisitor.reverseColumnAliases)) {
- int limit = sqlVisitor.limit == -1 ? 100 : sqlVisitor.limit;
- StreamComparator comp = getComp(sqlVisitor.sorts, sqlVisitor.reverseColumnAliases);
- //Rank the Tuples
- //If parallel stream is used ALL the Rolled up tuples from the workers will be ranked
- //Providing a true Top or Bottom.
- tupleStream = new RankStream(tupleStream, limit, comp);
- } else {
- // Sort is the same as the same as the underlying stream
- // Only need to limit the result, not Rank the result
- if(sqlVisitor.limit > -1) {
- tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
- }
- }
- }
-
- if(sqlVisitor.hasColumnAliases) {
- tupleStream = new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- return tupleStream;
- }
-
- private static TupleStream doSelectDistinct(SQLVisitor sqlVisitor,
- int numWorkers,
- String workerCollection,
- String workerZkHost) throws IOException {
-
- Set<String> fieldSet = new HashSet();
- Bucket[] buckets = getBuckets(sqlVisitor.fields, fieldSet);
- Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
-
- if(metrics.length > 0) {
- throw new IOException("Select Distinct queries cannot include aggregate functions.");
- }
-
- String fl = fields(fieldSet);
-
- String sort = null;
- StreamEqualitor ecomp = null;
- StreamComparator comp = null;
-
- if(sqlVisitor.sorts != null && sqlVisitor.sorts.size() > 0) {
- StreamComparator[] adjustedSorts = adjustSorts(sqlVisitor.sorts, buckets, sqlVisitor.reverseColumnAliases);
- // Because of the way adjustSorts works we know that each FieldComparator has a single
- // field name. For this reason we can just look at the leftFieldName
- FieldEqualitor[] fieldEqualitors = new FieldEqualitor[adjustedSorts.length];
- StringBuilder buf = new StringBuilder();
- for(int i=0; i<adjustedSorts.length; i++) {
- FieldComparator fieldComparator = (FieldComparator)adjustedSorts[i];
- fieldEqualitors[i] = new FieldEqualitor(fieldComparator.getLeftFieldName());
- if(i>0) {
- buf.append(",");
- }
- buf.append(fieldComparator.getLeftFieldName()).append(" ").append(fieldComparator.getOrder().toString());
- }
-
- sort = buf.toString();
-
- if(adjustedSorts.length == 1) {
- ecomp = fieldEqualitors[0];
- comp = adjustedSorts[0];
- } else {
- ecomp = new MultipleFieldEqualitor(fieldEqualitors);
- comp = new MultipleFieldComparator(adjustedSorts);
- }
- } else {
- StringBuilder sortBuf = new StringBuilder();
- FieldEqualitor[] equalitors = new FieldEqualitor[buckets.length];
- StreamComparator[] streamComparators = new StreamComparator[buckets.length];
- for(int i=0; i<buckets.length; i++) {
- equalitors[i] = new FieldEqualitor(buckets[i].toString());
- streamComparators[i] = new FieldComparator(buckets[i].toString(), ComparatorOrder.ASCENDING);
- if(i>0) {
- sortBuf.append(',');
- }
- sortBuf.append(buckets[i].toString()).append(" asc");
- }
-
- sort = sortBuf.toString();
-
- if(equalitors.length == 1) {
- ecomp = equalitors[0];
- comp = streamComparators[0];
- } else {
- ecomp = new MultipleFieldEqualitor(equalitors);
- comp = new MultipleFieldComparator(streamComparators);
- }
- }
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
- Map<String, String> params = new HashMap();
-
- params.put(CommonParams.FL, fl);
- params.put(CommonParams.Q, sqlVisitor.query);
- //Always use the /export handler for Distinct Queries because it requires exporting full result sets.
- params.put(CommonParams.QT, "/export");
-
- if(numWorkers > 1) {
- params.put("partitionKeys", getPartitionKeys(buckets));
- }
-
- params.put("sort", sort);
-
- TupleStream tupleStream = null;
-
- CloudSolrStream cstream = new CloudSolrStream(zkHost, collection, params);
- tupleStream = new UniqueStream(cstream, ecomp);
-
- if(numWorkers > 1) {
- // Do the unique in parallel
- // Maintain the sort of the Tuples coming from the workers.
- ParallelStream parallelStream = new ParallelStream(workerZkHost, workerCollection, tupleStream, numWorkers, comp);
-
- StreamFactory factory = new StreamFactory()
- .withFunctionName("search", CloudSolrStream.class)
- .withFunctionName("parallel", ParallelStream.class)
- .withFunctionName("unique", UniqueStream.class);
-
- parallelStream.setStreamFactory(factory);
- tupleStream = parallelStream;
- }
-
- if(sqlVisitor.limit > 0) {
- tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
- }
-
- if(sqlVisitor.hasColumnAliases) {
- tupleStream = new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- return tupleStream;
- }
-
- private static StreamComparator[] adjustSorts(List<SortItem> sorts, Bucket[] buckets, Map<String, String> reverseColumnAliases) throws IOException {
- List<FieldComparator> adjustedSorts = new ArrayList();
- Set<String> bucketFields = new HashSet();
- Set<String> sortFields = new HashSet();
-
- for(SortItem sortItem : sorts) {
-
- sortFields.add(getSortField(sortItem, reverseColumnAliases));
- adjustedSorts.add(new FieldComparator(getSortField(sortItem, reverseColumnAliases),
- ascDescComp(sortItem.getOrdering().toString())));
- }
-
- for(Bucket bucket : buckets) {
- bucketFields.add(bucket.toString());
- }
-
- for(SortItem sortItem : sorts) {
- String sortField = getSortField(sortItem, reverseColumnAliases);
- if(!bucketFields.contains(sortField)) {
- throw new IOException("All sort fields must be in the field list.");
- }
- }
-
- //Add sort fields if needed
- if(sorts.size() < buckets.length) {
- for(Bucket bucket : buckets) {
- String b = bucket.toString();
- if(!sortFields.contains(b)) {
- adjustedSorts.add(new FieldComparator(bucket.toString(), ComparatorOrder.ASCENDING));
- }
- }
- }
-
- return adjustedSorts.toArray(new FieldComparator[adjustedSorts.size()]);
- }
-
- private static TupleStream doSelectDistinctFacets(SQLVisitor sqlVisitor) throws IOException {
-
- Set<String> fieldSet = new HashSet();
- Bucket[] buckets = getBuckets(sqlVisitor.fields, fieldSet);
- Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
-
- if(metrics.length > 0) {
- throw new IOException("Select Distinct queries cannot include aggregate functions.");
- }
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
- Map<String, String> params = new HashMap();
-
- params.put(CommonParams.Q, sqlVisitor.query);
-
- int limit = sqlVisitor.limit > 0 ? sqlVisitor.limit : 100;
-
- FieldComparator[] sorts = null;
-
- if(sqlVisitor.sorts == null) {
- sorts = new FieldComparator[buckets.length];
- for(int i=0; i<sorts.length; i++) {
- sorts[i] = new FieldComparator("index", ComparatorOrder.ASCENDING);
- }
- } else {
- StreamComparator[] comps = adjustSorts(sqlVisitor.sorts, buckets, sqlVisitor.reverseColumnAliases);
- sorts = new FieldComparator[comps.length];
- for(int i=0; i<comps.length; i++) {
- sorts[i] = (FieldComparator)comps[i];
- }
- }
-
- TupleStream tupleStream = new FacetStream(zkHost,
- collection,
- params,
- buckets,
- metrics,
- sorts,
- limit);
-
- if(sqlVisitor.limit > 0) {
- tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
- }
-
- return new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- private static TupleStream doGroupByWithAggregatesFacets(SQLVisitor sqlVisitor) throws IOException {
-
- Set<String> fieldSet = new HashSet();
- Bucket[] buckets = getBuckets(sqlVisitor.groupBy, fieldSet);
- Metric[] metrics = getMetrics(sqlVisitor.fields, fieldSet);
- if(metrics.length == 0) {
- throw new IOException("Group by queries must include atleast one aggregate function.");
- }
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
- Map<String, String> params = new HashMap();
-
- params.put(CommonParams.Q, sqlVisitor.query);
-
- int limit = sqlVisitor.limit > 0 ? sqlVisitor.limit : 100;
-
- FieldComparator[] sorts = null;
-
- if(sqlVisitor.sorts == null) {
- sorts = new FieldComparator[buckets.length];
- for(int i=0; i<sorts.length; i++) {
- sorts[i] = new FieldComparator("index", ComparatorOrder.ASCENDING);
- }
- } else {
- sorts = getComps(sqlVisitor.sorts, sqlVisitor.reverseColumnAliases);
- }
-
- TupleStream tupleStream = new FacetStream(zkHost,
- collection,
- params,
- buckets,
- metrics,
- sorts,
- limit);
-
- if(sqlVisitor.havingExpression != null) {
- tupleStream = new HavingStream(tupleStream, sqlVisitor.havingExpression, sqlVisitor.reverseColumnAliases);
- }
-
- if(sqlVisitor.limit > 0)
- {
- tupleStream = new LimitStream(tupleStream, sqlVisitor.limit);
- }
-
- if(sqlVisitor.hasColumnAliases) {
- tupleStream = new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- return tupleStream;
- }
-
- private static TupleStream doSelect(SQLVisitor sqlVisitor) throws IOException {
- List<String> fields = sqlVisitor.fields;
- Set<String> fieldSet = new HashSet();
- Metric[] metrics = getMetrics(fields, fieldSet);
- if(metrics.length > 0) {
- return doAggregates(sqlVisitor, metrics);
- }
-
- StringBuilder flbuf = new StringBuilder();
- boolean comma = false;
-
- if(fields.size() == 0) {
- throw new IOException("Select columns must be specified.");
- }
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
-
- boolean score = false;
-
- for (String field : fields) {
-
- if(field.contains("(")) {
- throw new IOException("Aggregate functions only supported with group by queries.");
- }
-
- if(field.contains("*")) {
- throw new IOException("* is not supported for column selection.");
- }
-
- if(field.equals("score")) {
- if(sqlVisitor.limit < 0) {
- throw new IOException("score is not a valid field for unlimited select queries");
- } else {
- score = true;
- }
- }
-
- if (comma) {
- flbuf.append(",");
- }
-
- comma = true;
- flbuf.append(field);
- }
-
- String fl = flbuf.toString();
-
- List<SortItem> sorts = sqlVisitor.sorts;
-
- StringBuilder siBuf = new StringBuilder();
-
- comma = false;
-
- if(sorts != null) {
- for (SortItem sortItem : sorts) {
- if (comma) {
- siBuf.append(",");
- }
- siBuf.append(getSortField(sortItem, sqlVisitor.reverseColumnAliases) + " " + ascDesc(sortItem.getOrdering().toString()));
- }
- } else {
- if(sqlVisitor.limit < 0) {
- siBuf.append("_version_ desc");
- fl = fl+",_version_";
- } else {
- siBuf.append("score desc");
- if(!score) {
- fl = fl+",score";
- }
- }
- }
-
- Map<String, String> params = new HashMap();
- params.put("fl", fl.toString());
- params.put("q", sqlVisitor.query);
-
- if(siBuf.length() > 0) {
- params.put("sort", siBuf.toString());
- }
-
- TupleStream tupleStream;
-
- if(sqlVisitor.limit > -1) {
- params.put("rows", Integer.toString(sqlVisitor.limit));
- tupleStream = new LimitStream(new CloudSolrStream(zkHost, collection, params), sqlVisitor.limit);
- } else {
- //Only use the export handler when no limit is specified.
- params.put(CommonParams.QT, "/export");
- tupleStream = new CloudSolrStream(zkHost, collection, params);
- }
-
- return new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- private static boolean sortsEqual(Bucket[] buckets, String direction, List<SortItem> sortItems, Map<String, String> reverseColumnAliases) {
- if(buckets.length != sortItems.size()) {
- return false;
- }
-
- for(int i=0; i< buckets.length; i++) {
- Bucket bucket = buckets[i];
- SortItem sortItem = sortItems.get(i);
- if(!bucket.toString().equals(getSortField(sortItem, reverseColumnAliases))) {
- return false;
- }
-
-
- if(!sortItem.getOrdering().toString().toLowerCase(Locale.ROOT).contains(direction.toLowerCase(Locale.ROOT))) {
- return false;
- }
- }
-
- return true;
- }
-
- private static TupleStream doAggregates(SQLVisitor sqlVisitor, Metric[] metrics) throws IOException {
-
- if(metrics.length != sqlVisitor.fields.size()) {
- throw new IOException("Only aggregate functions are allowed when group by is not specified.");
- }
-
- TableSpec tableSpec = new TableSpec(sqlVisitor.table, defaultZkhost);
-
- String zkHost = tableSpec.zkHost;
- String collection = tableSpec.collection;
- Map<String, String> params = new HashMap();
-
- params.put(CommonParams.Q, sqlVisitor.query);
-
- TupleStream tupleStream = new StatsStream(zkHost,
- collection,
- params,
- metrics);
-
- if(sqlVisitor.hasColumnAliases) {
- tupleStream = new SelectStream(tupleStream, sqlVisitor.columnAliases);
- }
-
- return tupleStream;
- }
-
- private static String bucketSort(Bucket[] buckets, String dir) {
- StringBuilder buf = new StringBuilder();
- boolean comma = false;
- for(Bucket bucket : buckets) {
- if(comma) {
- buf.append(",");
- }
- buf.append(bucket.toString()).append(" ").append(dir);
- comma = true;
- }
-
- return buf.toString();
- }
-
- private static String getPartitionKeys(Bucket[] buckets) {
- StringBuilder buf = new StringBuilder();
- boolean comma = false;
- for(Bucket bucket : buckets) {
- if(comma) {
- buf.append(",");
- }
- buf.append(bucket.toString());
- comma = true;
- }
- return buf.toString();
- }
-
- private static String getSortDirection(List<SortItem> sorts) {
- if(sorts != null && sorts.size() > 0) {
- for(SortItem item : sorts) {
- return ascDesc(stripSingleQuotes(stripQuotes(item.getOrdering().toString())));
- }
- }
-
- return "asc";
- }
-
- private static StreamComparator bucketSortComp(Bucket[] buckets, String dir) {
- FieldComparator[] comps = new FieldComparator[buckets.length];
- for(int i=0; i<buckets.length; i++) {
- ComparatorOrder comparatorOrder = ascDescComp(dir);
- String sortKey = buckets[i].toString();
- comps[i] = new FieldComparator(stripQuotes(sortKey), comparatorOrder);
- }
-
- if(comps.length == 1) {
- return comps[0];
- } else {
- return new MultipleFieldComparator(comps);
- }
- }
-
- private static StreamComparator getComp(List<SortItem> sortItems, Map<String, String> reverseColumnAliases) {
- FieldComparator[] comps = new FieldComparator[sortItems.size()];
- for(int i=0; i<sortItems.size(); i++) {
- SortItem sortItem = sortItems.get(i);
- String ordering = sortItem.getOrdering().toString();
- ComparatorOrder comparatorOrder = ascDescComp(ordering);
- String sortKey = getSortField(sortItem, reverseColumnAliases);
- comps[i] = new FieldComparator(sortKey, comparatorOrder);
- }
-
- if(comps.length == 1) {
- return comps[0];
- } else {
- return new MultipleFieldComparator(comps);
- }
- }
-
- private static FieldComparator[] getComps(List<SortItem> sortItems, Map<String, String> reverseColumnAliases) {
- FieldComparator[] comps = new FieldComparator[sortItems.size()];
- for(int i=0; i<sortItems.size(); i++) {
- SortItem sortItem = sortItems.get(i);
- String ordering = sortItem.getOrdering().toString();
- ComparatorOrder comparatorOrder = ascDescComp(ordering);
- String sortKey = getSortField(sortItem, reverseColumnAliases);
- comps[i] = new FieldComparator(sortKey, comparatorOrder);
- }
-
- return comps;
- }
-
-
- private static String fields(Set<String> fieldSet) {
- StringBuilder buf = new StringBuilder();
- boolean comma = false;
- for(String field : fieldSet) {
- if(comma) {
- buf.append(",");
- }
- buf.append(field);
- comma = true;
- }
-
- return buf.toString();
- }
-
- private static Metric[] getMetrics(List<String> fields, Set<String> fieldSet) throws IOException {
- List<Metric> metrics = new ArrayList();
- for(String field : fields) {
- if(field.contains("(")) {
-
- field = field.substring(0, field.length()-1);
- String[] parts = field.split("\\(");
- String function = parts[0];
- validateFunction(function);
- String column = parts[1];
- if(function.equals("min")) {
- metrics.add(new MinMetric(column));
- fieldSet.add(column);
- } else if(function.equals("max")) {
- metrics.add(new MaxMetric(column));
- fieldSet.add(column);
- } else if(function.equals("sum")) {
- metrics.add(new SumMetric(column));
- fieldSet.add(column);
- } else if(function.equals("avg")) {
- metrics.add(new MeanMetric(column));
- fieldSet.add(column);
- } else if(function.equals("count")) {
- metrics.add(new CountMetric());
- }
- }
- }
- return metrics.toArray(new Metric[metrics.size()]);
- }
-
- private static void validateFunction(String function) throws IOException {
- if(function.equals("min") || function.equals("max") || function.equals("sum") || function.equals("avg") || function.equals("count")) {
- return;
- } else {
- throw new IOException("Invalid function: "+function);
- }
- }
-
- private static Bucket[] getBuckets(List<String> fields, Set<String> fieldSet) {
- List<Bucket> buckets = new ArrayList();
- for(String field : fields) {
- String f = stripQuotes(field);
- buckets.add(new Bucket(f));
- fieldSet.add(f);
- }
-
- return buckets.toArray(new Bucket[buckets.size()]);
- }
-
- private static String ascDesc(String s) {
- if(s.toLowerCase(Locale.ROOT).contains("desc")) {
- return "desc";
- } else {
- return "asc";
- }
- }
-
- private static ComparatorOrder ascDescComp(String s) {
- if(s.toLowerCase(Locale.ROOT).contains("desc")) {
- return ComparatorOrder.DESCENDING;
- } else {
- return ComparatorOrder.ASCENDING;
- }
- }
-
- private static String stripQuotes(String s) {
- StringBuilder buf = new StringBuilder();
- for(int i=0; i<s.length(); i++) {
- char c = s.charAt(i);
- if(c != '"') {
- buf.append(c);
- }
- }
-
- return buf.toString();
- }
-
- private static String stripSingleQuotes(String s) {
- StringBuilder buf = new StringBuilder();
- for(int i=0; i<s.length(); i++) {
- char c = s.charAt(i);
- if(c != '\'') {
- buf.append(c);
- }
- }
-
- return buf.toString();
- }
-
-
- private static class TableSpec {
- private String collection;
- private String zkHost;
-
- public TableSpec(String table, String defaultZkHost) {
- if(table.contains("@")) {
- String[] parts = table.split("@");
- this.collection = parts[0];
- this.zkHost = parts[1];
- } else {
- this.collection = table;
- this.zkHost = defaultZkHost;
- }
- }
- }
-
- private static class ExpressionVisitor extends AstVisitor<Void, StringBuilder> {
-
- protected Void visitLogicalBinaryExpression(LogicalBinaryExpression node, StringBuilder buf) {
- buf.append("(");
- process(node.getLeft(), buf);
- buf.append(" ").append(node.getType().toString()).append(" ");
- process(node.getRight(), buf);
- buf.append(")");
- return null;
- }
-
- protected Void visitNotExpression(NotExpression node, StringBuilder buf) {
- buf.append("-");
- process(node.getValue(), buf);
- return null;
- }
-
- protected Void visitComparisonExpression(ComparisonExpression node, StringBuilder buf) {
- String field = getPredicateField(node.getLeft());
- String value = node.getRight().toString();
- value = stripSingleQuotes(value);
-
- if(!value.startsWith("(") && !value.startsWith("[")) {
- //If no parens default to a phrase search.
- value = '"'+value+'"';
- }
-
- buf.append('(').append(field + ":" + value).append(')');
- return null;
- }
- }
-
- static class SQLVisitor extends AstVisitor<Void, Integer> {
- private final StringBuilder builder;
- public String table;
- public List<String> fields = new ArrayList();
- public List<String> groupBy = new ArrayList();
- public List<SortItem> sorts;
- public String query ="*:*"; //If no query is specified pull all the records
- public int limit = -1;
- public boolean groupByQuery;
- public Expression havingExpression;
- public boolean isDistinct;
- public boolean hasColumnAliases;
- public Map<String, String> columnAliases = new HashMap();
- public Map<String, String> reverseColumnAliases = new HashMap();
-
- public SQLVisitor(StringBuilder builder) {
- this.builder = builder;
- }
-
- protected Void visitNode(Node node, Integer indent) {
- throw new UnsupportedOperationException("not yet implemented: " + node);
- }
-
- protected void reverseAliases() {
- for(String key : columnAliases.keySet()) {
- reverseColumnAliases.put(columnAliases.get(key), key);
- }
-
- //Handle the group by.
- List<String> newGroups = new ArrayList();
-
- for(String g : groupBy) {
- if (reverseColumnAliases.containsKey(g)) {
- newGroups.add(reverseColumnAliases.get(g));
- } else {
- newGroups.add(g);
- }
- }
-
- groupBy = newGroups;
- }
-
-
-
-
- protected Void visitUnnest(Unnest node, Integer indent) {
- return null;
- }
-
- protected Void visitQuery(Query node, Integer indent) {
- if(node.getWith().isPresent()) {
- With confidence = (With)node.getWith().get();
- this.append(indent.intValue(), "WITH");
- if(confidence.isRecursive()) {
- }
-
- Iterator queries = confidence.getQueries().iterator();
-
- while(queries.hasNext()) {
- WithQuery query = (WithQuery)queries.next();
- this.process(new TableSubquery(query.getQuery()), indent);
- if(queries.hasNext()) {
- }
- }
- }
-
- this.processRelation(node.getQueryBody(), indent);
- if(!node.getOrderBy().isEmpty()) {
- this.sorts = node.getOrderBy();
- }
-
- if(node.getLimit().isPresent()) {
- }
-
- if(node.getApproximate().isPresent()) {
-
- }
-
- return null;
- }
-
- protected Void visitQuerySpecification(QuerySpecification node, Integer indent) {
- this.process(node.getSelect(), indent);
- if(node.getFrom().isPresent()) {
- this.process((Node)node.getFrom().get(), indent);
- }
-
- if(node.getWhere().isPresent()) {
- Expression ex = node.getWhere().get();
- ExpressionVisitor expressionVisitor = new ExpressionVisitor();
- StringBuilder buf = new StringBuilder();
- expressionVisitor.process(ex, buf);
- this.query = buf.toString();
- }
-
- if(!node.getGroupBy().isEmpty()) {
- this.groupByQuery = true;
- List<Expression> groups = node.getGroupBy();
- for(Expression group : groups) {
- groupBy.add(getGroupField(group));
- }
- }
-
- if(node.getHaving().isPresent()) {
- this.havingExpression = node.getHaving().get();
- }
-
- if(!node.getOrderBy().isEmpty()) {
- this.sorts = node.getOrderBy();
- }
-
- if(node.getLimit().isPresent()) {
- this.limit = Integer.parseInt(stripQuotes(node.getLimit().get()));
- }
-
- return null;
- }
-
- protected Void visitComparisonExpression(ComparisonExpression node, Integer index) {
- String field = node.getLeft().toString();
- String value = node.getRight().toString();
- query = stripSingleQuotes(stripQuotes(field))+":"+stripQuotes(value);
- return null;
- }
-
- protected Void visitSelect(Select node, Integer indent) {
- this.append(indent.intValue(), "SELECT");
- if(node.isDistinct()) {
- this.isDistinct = true;
- }
-
- if(node.getSelectItems().size() > 1) {
- boolean first = true;
-
- for(Iterator var4 = node.getSelectItems().iterator(); var4.hasNext(); first = false) {
- SelectItem item = (SelectItem)var4.next();
- this.process(item, indent);
- }
- } else {
- this.process((Node) Iterables.getOnlyElement(node.getSelectItems()), indent);
- }
-
- return null;
- }
-
- protected Void visitSingleColumn(SingleColumn node, Integer indent) {
-
- Expression ex = node.getExpression();
- String field = null;
-
- if(ex instanceof QualifiedNameReference) {
-
- QualifiedNameReference ref = (QualifiedNameReference)ex;
- List<String> parts = ref.getName().getOriginalParts();
- field = parts.get(0);
-
- } else if(ex instanceof FunctionCall) {
-
- FunctionCall functionCall = (FunctionCall)ex;
- List<String> parts = functionCall.getName().getOriginalParts();
- List<Expression> args = functionCall.getArguments();
- String col = null;
-
- if(args.size() > 0 && args.get(0) instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference) args.get(0);
- col = ref.getName().getOriginalParts().get(0);
- field = parts.get(0)+"("+stripSingleQuotes(col)+")";
- } else {
- field = stripSingleQuotes(stripQuotes(functionCall.toString()));
- }
-
- } else if(ex instanceof StringLiteral) {
- StringLiteral stringLiteral = (StringLiteral)ex;
- field = stripSingleQuotes(stringLiteral.toString());
- }
-
- fields.add(field);
-
- if(node.getAlias().isPresent()) {
- String alias = node.getAlias().get();
- columnAliases.put(field, alias);
- hasColumnAliases = true;
- } else {
- columnAliases.put(field, field);
- }
-
- return null;
- }
-
-
-
-
- protected Void visitAllColumns(AllColumns node, Integer context) {
- return null;
- }
-
- protected Void visitTable(Table node, Integer indent) {
- this.table = stripSingleQuotes(node.getName().toString());
- return null;
- }
-
- protected Void visitAliasedRelation(AliasedRelation node, Integer indent) {
- this.process(node.getRelation(), indent);
- return null;
- }
-
- protected Void visitValues(Values node, Integer indent) {
- boolean first = true;
-
- for(Iterator var4 = node.getRows().iterator(); var4.hasNext(); first = false) {
- Expression row = (Expression)var4.next();
-
- }
-
- return null;
- }
-
- private void processRelation(Relation relation, Integer indent) {
- if(relation instanceof Table) {
- } else {
- this.process(relation, indent);
- }
- }
-
- private StringBuilder append(int indent, String value) {
- return this.builder.append(indentString(indent)).append(value);
- }
-
- private static String indentString(int indent) {
- return Strings.repeat(" ", indent);
- }
- }
-
- private static String getSortField(SortItem sortItem, Map<String, String> reverseColumnAliases)
- {
- String field;
- Expression ex = sortItem.getSortKey();
- if(ex instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference)ex;
- List<String> parts = ref.getName().getOriginalParts();
- field = parts.get(0);
- } else if(ex instanceof FunctionCall) {
- FunctionCall functionCall = (FunctionCall)ex;
- List<String> parts = functionCall.getName().getOriginalParts();
- List<Expression> args = functionCall.getArguments();
- String col = null;
-
- if(args.size() > 0 && args.get(0) instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference) args.get(0);
- col = ref.getName().getOriginalParts().get(0);
- field = parts.get(0)+"("+stripSingleQuotes(col)+")";
- } else {
- field = stripSingleQuotes(stripQuotes(functionCall.toString()));
- }
-
- } else {
- StringLiteral stringLiteral = (StringLiteral)ex;
- field = stripSingleQuotes(stringLiteral.toString());
- }
-
- if(reverseColumnAliases.containsKey(field)) {
- field = reverseColumnAliases.get(field);
- }
-
- return field;
- }
-
-
- private static String getHavingField(Expression ex)
- {
- String field;
- if(ex instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference)ex;
- List<String> parts = ref.getName().getOriginalParts();
- field = parts.get(0);
- } else if(ex instanceof FunctionCall) {
- FunctionCall functionCall = (FunctionCall)ex;
- List<String> parts = functionCall.getName().getOriginalParts();
- List<Expression> args = functionCall.getArguments();
- String col = null;
-
- if(args.size() > 0 && args.get(0) instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference) args.get(0);
- col = ref.getName().getOriginalParts().get(0);
- field = parts.get(0)+"("+stripSingleQuotes(col)+")";
- } else {
- field = stripSingleQuotes(stripQuotes(functionCall.toString()));
- }
-
- } else {
- StringLiteral stringLiteral = (StringLiteral)ex;
- field = stripSingleQuotes(stringLiteral.toString());
- }
-
- return field;
- }
-
-
- private static String getPredicateField(Expression ex)
- {
- String field;
- if(ex instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference)ex;
- List<String> parts = ref.getName().getOriginalParts();
- field = parts.get(0);
- } else {
- StringLiteral stringLiteral = (StringLiteral)ex;
- field = stripSingleQuotes(stringLiteral.toString());
- }
-
- return field;
- }
-
- private static String getGroupField(Expression ex)
- {
- String field;
- if(ex instanceof QualifiedNameReference) {
- QualifiedNameReference ref = (QualifiedNameReference)ex;
- List<String> parts = ref.getName().getOriginalParts();
- field = parts.get(0);
- } else {
- StringLiteral stringLiteral = (StringLiteral)ex;
- field = stripSingleQuotes(stringLiteral.toString());
- }
-
- return field;
- }
-
-
- private static class LimitStream extends TupleStream {
-
- private TupleStream stream;
- private int limit;
- private int count;
-
- public LimitStream(TupleStream stream, int limit) {
- this.stream = stream;
- this.limit = limit;
- }
-
- public void open() throws IOException {
- this.stream.open();
- }
-
- public void close() throws IOException {
- this.stream.close();
- }
-
- public List<TupleStream> children() {
- List<TupleStream> children = new ArrayList();
- children.add(stream);
- return children;
- }
-
- public StreamComparator getStreamSort(){
- return stream.getStreamSort();
- }
-
- public void setStreamContext(StreamContext context) {
- stream.setStreamContext(context);
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withChildren(new Explanation[]{
- stream.toExplanation(factory)
- })
- .withFunctionName("SQL LIMIT")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- public Tuple read() throws IOException {
- ++count;
- if(count > limit) {
- Map fields = new HashMap();
- fields.put("EOF", "true");
- return new Tuple(fields);
- }
-
- Tuple tuple = stream.read();
- return tuple;
- }
- }
-
- public static enum AggregationMode {
-
- MAP_REDUCE,
- FACET;
-
- public static AggregationMode getMode(String mode) throws IOException{
- if(mode.equalsIgnoreCase("facet")) {
- return FACET;
- } else if(mode.equalsIgnoreCase("map_reduce")) {
- return MAP_REDUCE;
- } else {
- throw new IOException("Invalid aggregation mode:"+mode);
- }
- }
- }
-
- private static class HavingStream extends TupleStream {
-
- private TupleStream stream;
- private HavingVisitor havingVisitor;
- private Expression havingExpression;
-
- public HavingStream(TupleStream stream, Expression havingExpression, Map<String, String> reverseAliasMap) {
- this.stream = stream;
- this.havingVisitor = new HavingVisitor(reverseAliasMap);
- this.havingExpression = havingExpression;
- }
-
- public void open() throws IOException {
- this.stream.open();
- }
-
- public void close() throws IOException {
- this.stream.close();
- }
-
- public StreamComparator getStreamSort(){
- return stream.getStreamSort();
- }
-
- public List<TupleStream> children() {
- List<TupleStream> children = new ArrayList();
- children.add(stream);
- return children;
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withChildren(new Explanation[]{
- stream.toExplanation(factory)
- })
- .withFunctionName("SQL HAVING")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- public void setStreamContext(StreamContext context) {
- stream.setStreamContext(context);
- }
-
- public Tuple read() throws IOException {
- while (true) {
- Tuple tuple = stream.read();
- if (tuple.EOF) {
- return tuple;
- }
-
- if (havingVisitor.process(havingExpression, tuple)) {
- return tuple;
- }
- }
- }
- }
-
- private static class CatalogsStream extends TupleStream {
- private final String zkHost;
- private StreamContext context;
- private int currentIndex = 0;
- private List<String> catalogs;
-
- CatalogsStream(String zkHost) {
- this.zkHost = zkHost;
- }
-
- public List<TupleStream> children() {
- return new ArrayList<>();
- }
-
- public void open() throws IOException {
- this.catalogs = new ArrayList<>();
- this.catalogs.add(this.zkHost);
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withFunctionName("SQL CATALOG")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- public Tuple read() throws IOException {
- Map<String, String> fields = new HashMap<>();
- if (this.currentIndex < this.catalogs.size()) {
- fields.put("TABLE_CAT", this.catalogs.get(this.currentIndex));
- this.currentIndex += 1;
- } else {
- fields.put("EOF", "true");
- }
- return new Tuple(fields);
- }
-
- public StreamComparator getStreamSort() {
- return null;
- }
-
- public void close() throws IOException {
-
- }
-
- public void setStreamContext(StreamContext context) {
- this.context = context;
- }
- }
-
- private static class SchemasStream extends TupleStream {
- private final String zkHost;
- private StreamContext context;
-
- SchemasStream(String zkHost) {
- this.zkHost = zkHost;
- }
-
- public List<TupleStream> children() {
- return new ArrayList<>();
- }
-
- public void open() throws IOException {
-
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withFunctionName("SQL SCHEMA")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- public Tuple read() throws IOException {
- Map<String, String> fields = new HashMap<>();
- fields.put("EOF", "true");
- return new Tuple(fields);
- }
-
- public StreamComparator getStreamSort() {
- return null;
- }
-
- public void close() throws IOException {
-
- }
-
- public void setStreamContext(StreamContext context) {
- this.context = context;
- }
- }
-
- private static class TableStream extends TupleStream {
- private final String zkHost;
- private StreamContext context;
- private int currentIndex = 0;
- private List<String> tables;
-
- TableStream(String zkHost) {
- this.zkHost = zkHost;
- }
-
- public List<TupleStream> children() {
- return new ArrayList<>();
- }
-
- public void open() throws IOException {
- this.tables = new ArrayList<>();
-
- CloudSolrClient cloudSolrClient = this.context.getSolrClientCache().getCloudSolrClient(this.zkHost);
- cloudSolrClient.connect();
- ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- if (zkStateReader.getClusterState().getCollections().size() != 0) {
- this.tables.addAll(zkStateReader.getClusterState().getCollections());
- }
- Collections.sort(this.tables);
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withFunctionName("SQL TABLE")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- public Tuple read() throws IOException {
- Map<String, String> fields = new HashMap<>();
- if (this.currentIndex < this.tables.size()) {
- fields.put("TABLE_CAT", this.zkHost);
- fields.put("TABLE_SCHEM", null);
- fields.put("TABLE_NAME", this.tables.get(this.currentIndex));
- fields.put("TABLE_TYPE", "TABLE");
- fields.put("REMARKS", null);
- this.currentIndex += 1;
- } else {
- fields.put("EOF", "true");
- }
- return new Tuple(fields);
- }
-
- public StreamComparator getStreamSort() {
- return null;
- }
-
- public void close() throws IOException {
-
- }
-
- public void setStreamContext(StreamContext context) {
- this.context = context;
- }
- }
-
- private static class MetadataStream extends TupleStream {
-
- private final TupleStream stream;
- private final SQLVisitor sqlVisitor;
- private boolean firstTuple = true;
-
- public MetadataStream(TupleStream stream, SQLVisitor sqlVistor) {
- this.stream = stream;
- this.sqlVisitor = sqlVistor;
- }
-
- public List<TupleStream> children() {
- return this.stream.children();
- }
-
- public void open() throws IOException {
- this.stream.open();
- }
-
- @Override
- public Explanation toExplanation(StreamFactory factory) throws IOException {
-
- return new StreamExplanation(getStreamNodeId().toString())
- .withChildren(new Explanation[]{
- stream.toExplanation(factory)
- })
- .withFunctionName("SQL METADATA")
- .withExpression("--non-expressible--")
- .withImplementingClass(this.getClass().getName())
- .withExpressionType(ExpressionType.STREAM_DECORATOR);
- }
-
- // Return a metadata tuple as the first tuple and then pass through to the underlying stream.
- public Tuple read() throws IOException {
- if(firstTuple) {
- firstTuple = false;
-
- Map fields = new HashMap<>();
- fields.put("isMetadata", true);
- fields.put("fields", sqlVisitor.fields);
- fields.put("aliases", sqlVisitor.columnAliases);
- return new Tuple(fields);
- }
-
- return this.stream.read();
- }
-
- public StreamComparator getStreamSort() {
- return this.stream.getStreamSort();
- }
-
- public void close() throws IOException {
- this.stream.close();
- }
-
- public void setStreamContext(StreamContext context) {
- this.stream.setStreamContext(context);
- }
- }
-
- private static class HavingVisitor extends AstVisitor<Boolean, Tuple> {
-
- private Map<String,String> reverseAliasMap;
-
- public HavingVisitor(Map<String, String> reverseAliasMap) {
- this.reverseAliasMap = reverseAliasMap;
- }
-
- protected Boolean visitLogicalBinaryExpression(LogicalBinaryExpression node, Tuple tuple) {
-
- Boolean b = process(node.getLeft(), tuple);
- if(node.getType() == LogicalBinaryExpression.Type.AND) {
- if(!b) {
- //Short circuit
- return false;
- } else {
- return process(node.getRight(), tuple);
- }
- } else {
- if(b) {
- //Short circuit
- return true;
- } else {
- return process(node.getRight(), tuple);
- }
- }
- }
-
- protected Boolean visitComparisonExpression(ComparisonExpression node, Tuple tuple) {
- String field = getHavingField(node.getLeft());
-
- if(reverseAliasMap.containsKey(field)) {
- field = reverseAliasMap.get(field);
- }
-
- double d = Double.parseDouble(node.getRight().toString());
- double td = tuple.getDouble(field);
- ComparisonExpression.Type t = node.getType();
-
- switch(t) {
- case LESS_THAN:
- return td < d;
- case LESS_THAN_OR_EQUAL:
- return td <= d;
- case NOT_EQUAL:
- return td != d;
- case EQUAL:
- return td == d;
- case GREATER_THAN:
- return td > d;
- case GREATER_THAN_OR_EQUAL:
- return td >= d;
- default:
- return false;
- }
- }
- }
- }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java
new file mode 100644
index 0000000..8697b07
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java
@@ -0,0 +1,90 @@
+package org.apache.solr.handler.sql;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Enumerator that reads from a Solr collection. */
+class SolrEnumerator implements Enumerator<Object> {
+ private final TupleStream tupleStream;
+ private final List<String> fields;
+ private Tuple current;
+
+ /** Creates a SolrEnumerator.
+ *
+ * @param tupleStream Solr TupleStream
+ * @param fields Fields to get from each Tuple
+ */
+ SolrEnumerator(TupleStream tupleStream, List<String> fields) {
+ this.tupleStream = tupleStream;
+ this.fields = fields;
+ this.current = null;
+ }
+
+ /** Produce the next row from the results
+ *
+ * @return A new row from the results
+ */
+ public Object current() {
+ if (fields.size() == 1) {
+ return current.get(fields.get(0));
+ } else {
+ // Build an array with all fields in this row
+ Object[] row = new Object[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ row[i] = current.get(fields.get(i));
+ }
+
+ return row;
+ }
+ }
+
+ public boolean moveNext() {
+ try {
+ Tuple tuple = this.tupleStream.read();
+ if (tuple.EOF) {
+ return false;
+ } else {
+ current = tuple;
+ return true;
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public void reset() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() {
+ if(this.tupleStream != null) {
+ try {
+ this.tupleStream.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
new file mode 100644
index 0000000..12113b4
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
@@ -0,0 +1,170 @@
+package org.apache.solr.handler.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of a {@link org.apache.calcite.rel.core.Filter} relational expression in Solr.
+ */
+public class SolrFilter extends Filter implements SolrRel {
+ public SolrFilter(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode child,
+ RexNode condition) {
+ super(cluster, traitSet, child, condition);
+ assert getConvention() == SolrRel.CONVENTION;
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ public SolrFilter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new SolrFilter(getCluster(), traitSet, input, condition);
+ }
+
+ public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+ Translator translator = new Translator(SolrRules.solrFieldNames(getRowType()));
+ List<String> fqs = translator.translateMatch(condition);
+ implementor.add(null, fqs);
+ }
+
+ /** Translates {@link RexNode} expressions into Solr fq strings. */
+ private static class Translator {
+ private final List<String> fieldNames;
+
+ Translator(List<String> fieldNames) {
+ this.fieldNames = fieldNames;
+ }
+
+ private List<String> translateMatch(RexNode condition) {
+ return translateOr(condition);
+ }
+
+ private List<String> translateOr(RexNode condition) {
+ List<String> list = new ArrayList<>();
+ for (RexNode node : RelOptUtil.disjunctions(condition)) {
+ list.add(translateAnd(node));
+ }
+ return list;
+ }
+
+ /** Translates a condition that may be an AND of other conditions. Gathers
+ * together conditions that apply to the same field. */
+ private String translateAnd(RexNode node0) {
+ List<String> ands = new ArrayList<>();
+ for (RexNode node : RelOptUtil.conjunctions(node0)) {
+ ands.add(translateMatch2(node));
+ }
+
+ return String.join(" AND ", ands);
+ }
+
+ private String translateMatch2(RexNode node) {
+ switch (node.getKind()) {
+ case EQUALS:
+ return translateBinary(null, null, (RexCall) node);
+// case LESS_THAN:
+// return translateBinary("$lt", "$gt", (RexCall) node);
+// case LESS_THAN_OR_EQUAL:
+// return translateBinary("$lte", "$gte", (RexCall) node);
+// case NOT_EQUALS:
+// return translateBinary("$ne", "$ne", (RexCall) node);
+// case GREATER_THAN:
+// return translateBinary("$gt", "$lt", (RexCall) node);
+// case GREATER_THAN_OR_EQUAL:
+// return translateBinary("$gte", "$lte", (RexCall) node);
+ default:
+ throw new AssertionError("cannot translate " + node);
+ }
+ }
+
+ /** Translates a call to a binary operator, reversing arguments if necessary. */
+ private String translateBinary(String op, String rop, RexCall call) {
+ final RexNode left = call.operands.get(0);
+ final RexNode right = call.operands.get(1);
+ String b = translateBinary2(op, left, right);
+ if (b != null) {
+ return b;
+ }
+ b = translateBinary2(rop, right, left);
+ if (b != null) {
+ return b;
+ }
+ throw new AssertionError("cannot translate op " + op + " call " + call);
+ }
+
+ /** Translates a call to a binary operator. Returns whether successful. */
+ private String translateBinary2(String op, RexNode left, RexNode right) {
+ switch (right.getKind()) {
+ case LITERAL:
+ break;
+ default:
+ return null;
+ }
+ final RexLiteral rightLiteral = (RexLiteral) right;
+ switch (left.getKind()) {
+ case INPUT_REF:
+ final RexInputRef left1 = (RexInputRef) left;
+ String name = fieldNames.get(left1.getIndex());
+ return translateOp2(op, name, rightLiteral);
+ case CAST:
+ return translateBinary2(op, ((RexCall) left).operands.get(0), right);
+ case OTHER_FUNCTION:
+// String itemName = SolrRules.isItem((RexCall) left);
+// if (itemName != null) {
+// return translateOp2(op, itemName, rightLiteral);
+// }
+ // fall through
+ default:
+ return null;
+ }
+ }
+
+ private String translateOp2(String op, String name, RexLiteral right) {
+ if (op == null) {
+ // E.g.: {deptno: 100}
+ return name + ":" + right.getValue2();
+ } else {
+// // E.g. {deptno: {$lt: 100}}
+// // which may later be combined with other conditions:
+// // E.g. {deptno: [$lt: 100, $gt: 50]}
+// multimap.put(name, Pair.of(op, right));
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java
new file mode 100644
index 0000000..7e3fae2
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrMethod.java
@@ -0,0 +1,47 @@
+package org.apache.solr.handler.sql;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.linq4j.tree.Types;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Builtin methods in the Solr adapter.
+ */
+public enum SolrMethod {
+ SOLR_QUERYABLE_QUERY(SolrTable.SolrQueryable.class, "query", List.class, List.class, List.class, String.class);
+
+ public final Method method;
+
+ public static final ImmutableMap<Method, SolrMethod> MAP;
+
+ static {
+ final ImmutableMap.Builder<Method, SolrMethod> builder = ImmutableMap.builder();
+ for (SolrMethod value : SolrMethod.values()) {
+ builder.put(value.method, value);
+ }
+ MAP = builder.build();
+ }
+
+ SolrMethod(Class clazz, String methodName, Class... argumentTypes) {
+ this.method = Types.lookupMethod(clazz, methodName, argumentTypes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrProject.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrProject.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrProject.java
new file mode 100644
index 0000000..ee44dd1
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrProject.java
@@ -0,0 +1,69 @@
+package org.apache.solr.handler.sql;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Project} relational expression in Solr.
+ */
+public class SolrProject extends Project implements SolrRel {
+ public SolrProject(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traitSet, input, projects, rowType);
+ assert getConvention() == SolrRel.CONVENTION;
+ assert getConvention() == input.getConvention();
+ }
+
+ @Override
+ public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
+ return new SolrProject(getCluster(), traitSet, input, projects, rowType);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+ final SolrRules.RexToSolrTranslator translator = new SolrRules.RexToSolrTranslator(
+ (JavaTypeFactory) getCluster().getTypeFactory(), SolrRules.solrFieldNames(getInput().getRowType()));
+ final Map<String, String> fieldMappings = new HashMap<>();
+ for (Pair<RexNode, String> pair : getNamedProjects()) {
+ final String name = pair.right;
+ final String expr = pair.left.accept(translator);
+ fieldMappings.put(name, expr);
+ }
+ implementor.add(fieldMappings, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
new file mode 100644
index 0000000..70dd8cc
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrRel.java
@@ -0,0 +1,75 @@
+package org.apache.solr.handler.sql;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Relational expression that uses Solr calling convention.
+ */
+public interface SolrRel extends RelNode {
+ void implement(Implementor implementor);
+
+ /** Calling convention for relational operations that occur in Cassandra. */
+ Convention CONVENTION = new Convention.Impl("SOLR", SolrRel.class);
+
+ /** Callback for the implementation process that converts a tree of {@link SolrRel} nodes into a Solr query. */
+ class Implementor {
+ final Map<String, String> fieldMappings = new HashMap<>();
+ final List<String> filterQueries = new ArrayList<>();
+ String limitValue = null;
+ final List<String> order = new ArrayList<>();
+
+ RelOptTable table;
+ SolrTable solrTable;
+
+ /** Adds newly projected fields and restricted filterQueries.
+ *
+ * @param fields New fields to be projected from a query
+ * @param filterQueries New filterQueries to be applied to the query
+ */
+ public void add(Map<String, String> fieldMappings, List<String> filterQueries) {
+ if (fieldMappings != null) {
+ this.fieldMappings.putAll(fieldMappings);
+ }
+ if (filterQueries != null) {
+ this.filterQueries.addAll(filterQueries);
+ }
+ }
+
+ public void addOrder(List<String> newOrder) {
+ order.addAll(newOrder);
+ }
+
+ public void setLimit(String limit) {
+ limitValue = limit;
+ }
+
+ public void visitChild(int ordinal, RelNode input) {
+ assert ordinal == 0;
+ ((SolrRel) input).implement(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b08a4636/solr/core/src/java/org/apache/solr/handler/sql/SolrRules.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrRules.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrRules.java
new file mode 100644
index 0000000..d1fdbce
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrRules.java
@@ -0,0 +1,246 @@
+package org.apache.solr.handler.sql;
+
+import java.util.AbstractList;
+import java.util.List;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Rules and relational operators for
+ * {@link SolrRel#CONVENTION}
+ * calling convention.
+ */
+public class SolrRules {
+ private SolrRules() {}
+
+ static final RelOptRule[] RULES = {
+ SolrFilterRule.INSTANCE,
+ SolrProjectRule.INSTANCE,
+// SolrSortRule.INSTANCE
+ };
+
+ static List<String> solrFieldNames(final RelDataType rowType) {
+ return SqlValidatorUtil.uniquify(
+ new AbstractList<String>() {
+ @Override
+ public String get(int index) {
+ return rowType.getFieldList().get(index).getName();
+ }
+
+ @Override
+ public int size() {
+ return rowType.getFieldCount();
+ }
+ });
+ }
+
+ /** Translator from {@link RexNode} to strings in Solr's expression language. */
+ static class RexToSolrTranslator extends RexVisitorImpl<String> {
+ private final JavaTypeFactory typeFactory;
+ private final List<String> inFields;
+
+ RexToSolrTranslator(JavaTypeFactory typeFactory, List<String> inFields) {
+ super(true);
+ this.typeFactory = typeFactory;
+ this.inFields = inFields;
+ }
+
+ @Override
+ public String visitInputRef(RexInputRef inputRef) {
+ return inFields.get(inputRef.getIndex());
+ }
+ }
+
+ /** Base class for planner rules that convert a relational expression to Solr calling convention. */
+ abstract static class SolrConverterRule extends ConverterRule {
+ final Convention out;
+
+ public SolrConverterRule(Class<? extends RelNode> clazz, String description) {
+ this(clazz, Predicates.<RelNode>alwaysTrue(), description);
+ }
+
+ public <R extends RelNode> SolrConverterRule(Class<R> clazz, Predicate<? super R> predicate, String description) {
+ super(clazz, predicate, Convention.NONE, SolrRel.CONVENTION, description);
+ this.out = SolrRel.CONVENTION;
+ }
+ }
+
+ /**
+ * Rule to convert a {@link LogicalFilter} to a {@link SolrFilter}.
+ */
+ private static class SolrFilterRule extends SolrConverterRule {
+ private static final SolrFilterRule INSTANCE = new SolrFilterRule();
+
+ private SolrFilterRule() {
+ super(LogicalFilter.class, "SolrFilterRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalFilter filter = (LogicalFilter) rel;
+ final RelTraitSet traitSet = filter.getTraitSet().replace(out);
+ return new SolrFilter(
+ rel.getCluster(),
+ traitSet,
+ convert(filter.getInput(), out),
+ filter.getCondition());
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject} to a {@link SolrProject}.
+ */
+ private static class SolrProjectRule extends SolrConverterRule {
+ private static final SolrProjectRule INSTANCE = new SolrProjectRule();
+
+ private SolrProjectRule() {
+ super(LogicalProject.class, "SolrProjectRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalProject project = (LogicalProject) rel;
+ final RelTraitSet traitSet = project.getTraitSet().replace(out);
+ return new SolrProject(project.getCluster(), traitSet,
+ convert(project.getInput(), out), project.getProjects(), project.getRowType());
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a {@link SolrSort}.
+ */
+// private static class SolrSortRule extends RelOptRule {
+// private static final com.google.common.base.Predicate<Sort> SORT_PREDICATE =
+// input -> {
+// // CQL has no support for offsets
+// return input.offset == null;
+// };
+// private static final com.google.common.base.Predicate<SolrFilter> FILTER_PREDICATE =
+// input -> {
+// // We can only use implicit sorting within a single partition
+// return input.isSinglePartition();
+// };
+// private static final RelOptRuleOperand SOLR_OP =
+// operand(SolrToEnumerableConverter.class,
+// operand(SolrFilter.class, null, FILTER_PREDICATE, any()));
+//
+// private static final SolrSortRule INSTANCE = new SolrSortRule();
+//
+// private SolrSortRule() {
+// super(operand(Sort.class, null, SORT_PREDICATE, SOLR_OP), "SolrSortRule");
+// }
+//
+// public RelNode convert(Sort sort, SolrFilter filter) {
+// final RelTraitSet traitSet =
+// sort.getTraitSet().replace(SolrRel.CONVENTION)
+// .replace(sort.getCollation());
+// return new SolrSort(sort.getCluster(), traitSet,
+// convert(sort.getInput(), traitSet.replace(RelCollations.EMPTY)),
+// sort.getCollation(), filter.getImplicitCollation(), sort.fetch);
+// }
+//
+// public boolean matches(RelOptRuleCall call) {
+// final Sort sort = call.rel(0);
+// final SolrFilter filter = call.rel(2);
+// return collationsCompatible(sort.getCollation(), filter.getImplicitCollation());
+// }
+//
+// /** Check if it is possible to exploit native CQL sorting for a given collation.
+// *
+// * @return True if it is possible to achieve this sort in Solr
+// */
+// private boolean collationsCompatible(RelCollation sortCollation, RelCollation implicitCollation) {
+// List<RelFieldCollation> sortFieldCollations = sortCollation.getFieldCollations();
+// List<RelFieldCollation> implicitFieldCollations = implicitCollation.getFieldCollations();
+//
+// if (sortFieldCollations.size() > implicitFieldCollations.size()) {
+// return false;
+// }
+// if (sortFieldCollations.size() == 0) {
+// return true;
+// }
+//
+// // Check if we need to reverse the order of the implicit collation
+// boolean reversed = reverseDirection(sortFieldCollations.get(0).getDirection())
+// == implicitFieldCollations.get(0).getDirection();
+//
+// for (int i = 0; i < sortFieldCollations.size(); i++) {
+// RelFieldCollation sorted = sortFieldCollations.get(i);
+// RelFieldCollation implied = implicitFieldCollations.get(i);
+//
+// // Check that the fields being sorted match
+// if (sorted.getFieldIndex() != implied.getFieldIndex()) {
+// return false;
+// }
+//
+// // Either all fields must be sorted in the same direction
+// // or the opposite direction based on whether we decided
+// // if the sort direction should be reversed above
+// RelFieldCollation.Direction sortDirection = sorted.getDirection();
+// RelFieldCollation.Direction implicitDirection = implied.getDirection();
+// if ((!reversed && sortDirection != implicitDirection)
+// || (reversed && reverseDirection(sortDirection) != implicitDirection)) {
+// return false;
+// }
+// }
+//
+// return true;
+// }
+//
+// /** Find the reverse of a given collation direction.
+// *
+// * @return Reverse of the input direction
+// */
+// private RelFieldCollation.Direction reverseDirection(RelFieldCollation.Direction direction) {
+// switch(direction) {
+// case ASCENDING:
+// case STRICTLY_ASCENDING:
+// return RelFieldCollation.Direction.DESCENDING;
+// case DESCENDING:
+// case STRICTLY_DESCENDING:
+// return RelFieldCollation.Direction.ASCENDING;
+// default:
+// return null;
+// }
+// }
+//
+// /** @see org.apache.calcite.rel.convert.ConverterRule */
+// public void onMatch(RelOptRuleCall call) {
+// final Sort sort = call.rel(0);
+// SolrFilter filter = call.rel(2);
+// final RelNode converted = convert(sort, filter);
+// if (converted != null) {
+// call.transformTo(converted);
+// }
+// }
+// }
+}