You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jb...@apache.org on 2017/02/13 16:46:45 UTC

lucene-solr:jira/solr-8593: SOLR-8593: Make SQL handler friendlier out of the box

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-8593 de512d740 -> ec6ee96ae


SOLR-8593: Make SQL handler friendlier out of the box


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ec6ee96a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ec6ee96a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ec6ee96a

Branch: refs/heads/jira/solr-8593
Commit: ec6ee96ae6df1fdb2fffd881b45cb48670a10c5b
Parents: de512d7
Author: Joel Bernstein <jb...@apache.org>
Authored: Mon Feb 13 11:46:08 2017 -0500
Committer: Joel Bernstein <jb...@apache.org>
Committed: Mon Feb 13 11:46:08 2017 -0500

----------------------------------------------------------------------
 .../org/apache/solr/handler/SQLHandler.java     |   2 +-
 .../apache/solr/handler/sql/SolrEnumerator.java |  13 ++
 .../org/apache/solr/handler/sql/SolrFilter.java |  26 +--
 .../org/apache/solr/handler/sql/SolrSchema.java |   7 +-
 .../org/apache/solr/handler/sql/SolrTable.java  | 207 ++++++++++++++++---
 .../org/apache/solr/handler/TestSQLHandler.java |  74 ++++---
 .../solr/client/solrj/io/stream/JDBCStream.java |  28 ++-
 7 files changed, 269 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec6ee96a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
index 549efac..d65ea56 100644
--- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java
@@ -79,7 +79,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware, Per
     params.set("numWorkers", params.getInt("numWorkers", 1));
     params.set("workerCollection", params.get("workerCollection", defaultWorkerCollection));
     params.set("workerZkhost", params.get("workerZkhost", defaultZkhost));
-    params.set("aggregationMode", params.get("aggregationMode", "map_reduce"));
+    params.set("aggregationMode", params.get("aggregationMode", "facet"));
 
     TupleStream tupleStream = null;
     try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec6ee96a/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java
index 4299e61..6f9dddf 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrEnumerator.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
 
 /** Enumerator that reads from a Solr collection. */
@@ -34,6 +35,7 @@ class SolrEnumerator implements Enumerator<Object> {
   private final TupleStream tupleStream;
   private final List<Map.Entry<String, Class>> fields;
   private Tuple current;
+  private char sep = 31;
 
   /** Creates a SolrEnumerator.
    *
@@ -84,6 +86,17 @@ class SolrEnumerator implements Enumerator<Object> {
       return val;
     }
 
+    if(val instanceof ArrayList) {
+      ArrayList arrayList = (ArrayList) val;
+      StringBuilder buf = new StringBuilder();
+
+      for(Object o : arrayList) {
+        buf.append(sep);
+        buf.append(o.toString());
+      }
+      val = buf.toString();
+    }
+
     return val;
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec6ee96a/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
index 50102b1..ce12aec 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrFilter.java
@@ -132,7 +132,8 @@ class SolrFilter extends Filter implements SolrRel {
         case NOT:
           return "-" + translateComparison(((RexCall) node).getOperands().get(0));
         case EQUALS:
-          String terms = binaryTranslated.getValue().getValue2().toString().trim();
+          String terms = binaryTranslated.getValue().toString().trim();
+          terms = terms.replace("'","");
           if (!terms.startsWith("(") && !terms.startsWith("[") && !terms.startsWith("{")) {
             terms = "\"" + terms + "\"";
           }
@@ -141,19 +142,19 @@ class SolrFilter extends Filter implements SolrRel {
           this.negativeQuery = false;
           return clause;
         case NOT_EQUALS:
-          return "-(" + binaryTranslated.getKey() + ":" + binaryTranslated.getValue().getValue2() + ")";
+          return "-(" + binaryTranslated.getKey() + ":" + binaryTranslated.getValue() + ")";
         case LESS_THAN:
           this.negativeQuery = false;
-          return "(" + binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " })";
+          return "(" + binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue() + " })";
         case LESS_THAN_OR_EQUAL:
           this.negativeQuery = false;
-          return "(" + binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue().getValue2() + " ])";
+          return "(" + binaryTranslated.getKey() + ": [ * TO " + binaryTranslated.getValue() + " ])";
         case GREATER_THAN:
           this.negativeQuery = false;
-          return "(" + binaryTranslated.getKey() + ": { " + binaryTranslated.getValue().getValue2() + " TO * ])";
+          return "(" + binaryTranslated.getKey() + ": { " + binaryTranslated.getValue() + " TO * ])";
         case GREATER_THAN_OR_EQUAL:
           this.negativeQuery = false;
-          return "(" + binaryTranslated.getKey() + ": [ " + binaryTranslated.getValue().getValue2() + " TO * ])";
+          return "(" + binaryTranslated.getKey() + ": [ " + binaryTranslated.getValue() + " TO * ])";
         default:
           throw new AssertionError("cannot translate " + node);
       }
@@ -305,21 +306,20 @@ class SolrFilter extends Filter implements SolrRel {
       }
 
       switch (node.getKind()) {
-
         case EQUALS:
-          String terms = binaryTranslated.getValue().getValue2().toString().trim();
+          String terms = binaryTranslated.getValue().toString().trim();
           String clause = "eq(" + binaryTranslated.getKey() + "," + terms + ")";
           return clause;
         case NOT_EQUALS:
-          return "not(eq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + "))";
+          return "not(eq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue() + "))";
         case LESS_THAN:
-          return "lt(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
+          return "lt(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue() + ")";
         case LESS_THAN_OR_EQUAL:
-          return "lteq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
+          return "lteq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue() + ")";
         case GREATER_THAN:
-          return "gt(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
+          return "gt(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue() + ")";
         case GREATER_THAN_OR_EQUAL:
-          return "gteq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue().getValue2() + ")";
+          return "gteq(" + binaryTranslated.getKey() + "," + binaryTranslated.getValue() + ")";
         default:
           throw new AssertionError("cannot translate " + node);
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec6ee96a/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
index 221c2b6..83fa537 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
@@ -90,6 +90,7 @@ class SolrSchema extends AbstractSchema {
     final RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
     final RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder();
     Map<String, LukeResponse.FieldInfo> luceneFieldInfoMap = getFieldInfo(collection);
+
     for(Map.Entry<String, LukeResponse.FieldInfo> entry : luceneFieldInfoMap.entrySet()) {
       LukeResponse.FieldInfo luceneFieldInfo = entry.getValue();
 
@@ -110,13 +111,17 @@ class SolrSchema extends AbstractSchema {
           type = typeFactory.createJavaType(String.class);
       }
 
-      EnumSet<FieldFlag> flags = luceneFieldInfo.getFlags();
+      EnumSet<FieldFlag> flags = luceneFieldInfo.parseFlags(luceneFieldInfo.getSchema());
+      /*
       if(flags != null && flags.contains(FieldFlag.MULTI_VALUED)) {
         type = typeFactory.createArrayType(type, -1);
       }
+      */
 
       fieldInfo.add(entry.getKey(), type).nullable(true);
     }
+    fieldInfo.add("_query_",typeFactory.createJavaType(String.class));
+    fieldInfo.add("score",typeFactory.createJavaType(Double.class));
 
     return RelDataTypeImpl.proto(fieldInfo.build());
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec6ee96a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
index 5f64231..6784323 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrTable.java
@@ -32,6 +32,9 @@ import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
 import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
+import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
 import org.apache.solr.client.solrj.io.ops.AndOperation;
 import org.apache.solr.client.solrj.io.ops.BooleanOperation;
 import org.apache.solr.client.solrj.io.ops.EqualsOperation;
@@ -216,10 +219,10 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     }
   }
 
-  private List<Metric> buildMetrics(List<Pair<String, String>> metricPairs) {
+  private List<Metric> buildMetrics(List<Pair<String, String>> metricPairs, boolean ifEmptyCount) {
     List<Metric> metrics = new ArrayList<>(metricPairs.size());
     metrics.addAll(metricPairs.stream().map(this::getMetric).collect(Collectors.toList()));
-    if(metrics.size() == 0) {
+    if(metrics.size() == 0 && ifEmptyCount) {
       metrics.add(new CountMetric());
     }
     return metrics;
@@ -253,15 +256,35 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.add(CommonParams.Q, query);
 
+    //Validate the fields
+    for(Map.Entry<String, Class> entry : fields) {
+      String fname = entry.getKey();
+      if(limit == null && "score".equals(fname)) {
+        throw new IOException("score is not a valid field for unlimited queries.");
+      }
+
+      if(fname.contains("*")) {
+        throw new IOException("* is not supported for column selection.");
+      }
+    }
+
+    String fl = getFields(fields);
+
     if(orders.size() > 0) {
       params.add(CommonParams.SORT, getSort(orders));
     } else {
-      params.add(CommonParams.SORT, "_version_ desc");
+      if(limit == null) {
+        params.add(CommonParams.SORT, "_version_ desc");
+        fl = fl+",_version_";
+      } else {
+        params.add(CommonParams.SORT, "score desc");
+        if(fl.indexOf("score") == -1) {
+          fl = fl + ",score";
+        }
+      }
     }
 
-    if(fields.size() > 0) {
-      params.add(CommonParams.FL, getFields(fields));
-    }
+    params.add(CommonParams.FL, fl);
 
     if (limit != null) {
       params.add(CommonParams.ROWS, limit);
@@ -284,26 +307,23 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     return buf.toString();
   }
 
+  private String getSingleSort(Pair<String, String> order) {
+    StringBuilder buf = new StringBuilder();
+    buf.append(order.getKey()).append(" ").append(order.getValue());
+    return buf.toString();
+  }
+
   private String getFields(List<Map.Entry<String, Class>> fields) {
     StringBuilder buf = new StringBuilder();
-    boolean appendVersion = true;
     for(Map.Entry<String, Class> field : fields) {
 
       if(buf.length() > 0) {
         buf.append(",");
       }
 
-      if(field.getKey().equals("_version_")) {
-        appendVersion = false;
-      }
-
       buf.append(field.getKey());
     }
 
-    if(appendVersion){
-      buf.append(",_version_");
-    }
-
     return buf.toString();
   }
 
@@ -420,7 +440,11 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
 
     Bucket[] buckets = buildBuckets(_buckets, fields);
-    Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
+    Metric[] metrics = buildMetrics(metricPairs, false).toArray(new Metric[0]);
+
+    if(metrics.length == 0) {
+      return handleSelectDistinctMapReduce(zk, collection, properties, fields, query, orders, buckets, limit);
+    }
 
     Set<String> fieldSet = getFieldSet(metrics, fields);
 
@@ -527,7 +551,6 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     return bucketsArray;
   }
 
-
   private TupleStream handleGroupByFacet(String zkHost,
                                          String collection,
                                          final List<Map.Entry<String, Class>> fields,
@@ -542,13 +565,13 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     solrParams.add(CommonParams.Q, query);
 
     Bucket[] buckets = buildBuckets(bucketFields, fields);
-    Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
+    Metric[] metrics = buildMetrics(metricPairs, true).toArray(new Metric[0]);
     if(metrics.length == 0) {
       metrics = new Metric[1];
       metrics[0] = new CountMetric();
     }
 
-    int limit = lim != null ? Integer.parseInt(lim) : 100;
+    int limit = lim != null ? Integer.parseInt(lim) : 1000;
 
     FieldComparator[] sorts = null;
 
@@ -561,13 +584,15 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
       sorts = getComps(orders);
     }
 
+    int overfetch = (int)(limit * 1.25);
+
     TupleStream tupleStream = new FacetStream(zkHost,
                                               collection,
                                               solrParams,
                                               buckets,
                                               metrics,
                                               sorts,
-                                              limit);
+                                              overfetch);
 
 
 
@@ -602,30 +627,144 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
     return tupleStream;
   }
 
-  private TupleStream handleSelectDistinctMapReduce(final Properties properties,
+  private TupleStream handleSelectDistinctMapReduce(final String zkHost,
+                                                    final String collection,
+                                                    final Properties properties,
                                                     final List<Map.Entry<String, Class>> fields,
                                                     final String query,
                                                     final List<Pair<String, String>> orders,
-                                                    final List<String> buckets,
-                                                    final List<Pair<String, String>> metricPairs,
-                                                    final String limit) {
+                                                    final Bucket[] buckets,
+                                                    final String limit) throws IOException{
+
+    int numWorkers = Integer.parseInt(properties.getProperty("numWorkers", "1"));
+
+    String fl = getFields(fields);
+
+    String sort = null;
+    StreamEqualitor ecomp = null;
+    StreamComparator comp = null;
+
+    if(orders != null && orders.size() > 0) {
+      StreamComparator[] adjustedSorts = adjustSorts(orders, buckets);
+      // Because of the way adjustSorts works we know that each FieldComparator has a single
+      // field name. For this reason we can just look at the leftFieldName
+      FieldEqualitor[] fieldEqualitors = new FieldEqualitor[adjustedSorts.length];
+      StringBuilder buf = new StringBuilder();
+      for(int i=0; i<adjustedSorts.length; i++) {
+        FieldComparator fieldComparator = (FieldComparator)adjustedSorts[i];
+        fieldEqualitors[i] = new FieldEqualitor(fieldComparator.getLeftFieldName());
+        if(i>0) {
+          buf.append(",");
+        }
+        buf.append(fieldComparator.getLeftFieldName()).append(" ").append(fieldComparator.getOrder().toString());
+      }
+
+      sort = buf.toString();
+
+      if(adjustedSorts.length == 1) {
+        ecomp = fieldEqualitors[0];
+        comp = adjustedSorts[0];
+      } else {
+        ecomp = new MultipleFieldEqualitor(fieldEqualitors);
+        comp = new MultipleFieldComparator(adjustedSorts);
+      }
+    } else {
+      StringBuilder sortBuf = new StringBuilder();
+      FieldEqualitor[] equalitors = new FieldEqualitor[buckets.length];
+      StreamComparator[] streamComparators = new StreamComparator[buckets.length];
+      for(int i=0; i<buckets.length; i++) {
+        equalitors[i] = new FieldEqualitor(buckets[i].toString());
+        streamComparators[i] = new FieldComparator(buckets[i].toString(), ComparatorOrder.ASCENDING);
+        if(i>0) {
+          sortBuf.append(',');
+        }
+        sortBuf.append(buckets[i].toString()).append(" asc");
+      }
 
+      sort = sortBuf.toString();
 
+      if(equalitors.length == 1) {
+        ecomp = equalitors[0];
+        comp = streamComparators[0];
+      } else {
+        ecomp = new MultipleFieldEqualitor(equalitors);
+        comp = new MultipleFieldComparator(streamComparators);
+      }
+    }
 
+    ModifiableSolrParams params = new ModifiableSolrParams();
 
+    params.set(CommonParams.FL, fl);
+    params.set(CommonParams.Q, query);
+    //Always use the /export handler for Distinct Queries because it requires exporting full result sets.
+    params.set(CommonParams.QT, "/export");
 
+    if(numWorkers > 1) {
+      params.set("partitionKeys", getPartitionKeys(buckets));
+    }
 
-    return null;
+    params.set("sort", sort);
+
+    TupleStream tupleStream = null;
+
+    CloudSolrStream cstream = new CloudSolrStream(zkHost, collection, params);
+    tupleStream = new UniqueStream(cstream, ecomp);
+
+    if(numWorkers > 1) {
+      // Do the unique in parallel
+      // Maintain the sort of the Tuples coming from the workers.
+      ParallelStream parallelStream = new ParallelStream(zkHost, collection, tupleStream, numWorkers, comp);
+
+      StreamFactory factory = new StreamFactory()
+          .withFunctionName("search", CloudSolrStream.class)
+          .withFunctionName("parallel", ParallelStream.class)
+          .withFunctionName("unique", UniqueStream.class);
+
+      parallelStream.setStreamFactory(factory);
+      tupleStream = parallelStream;
+    }
+
+    if(limit != null) {
+      tupleStream = new LimitStream(tupleStream, Integer.parseInt(limit));
+    }
+
+    return tupleStream;
   }
 
-  private TupleStream handleSelectDistinctFacet(final Properties properties,
-                                                final List<Map.Entry<String, Class>> fields,
-                                                final String query,
-                                                final List<Pair<String, String>> orders,
-                                                final List<String> buckets,
-                                                final List<Pair<String, String>> metricPairs,
-                                                final String limit) {
-    return null;
+
+  private StreamComparator[] adjustSorts(List<Pair<String, String>> orders, Bucket[] buckets) throws IOException {
+    List<FieldComparator> adjustedSorts = new ArrayList();
+    Set<String> bucketFields = new HashSet();
+    Set<String> sortFields = new HashSet();
+
+    ComparatorOrder comparatorOrder = ComparatorOrder.ASCENDING;
+    for(Pair<String, String> order : orders) {
+      sortFields.add(order.getKey());
+      adjustedSorts.add(new FieldComparator(order.getKey(), ascDescComp(order.getValue())));
+      comparatorOrder = ascDescComp(order.getValue());
+    }
+
+    for(Bucket bucket : buckets) {
+      bucketFields.add(bucket.toString());
+    }
+
+    for(String sf : sortFields) {
+      if(!bucketFields.contains(sf)) {
+        throw new IOException("All sort fields must be in the field list.");
+      }
+    }
+
+    //Add sort fields if needed
+    if(sortFields.size() < buckets.length) {
+      for(Bucket bucket : buckets) {
+        String b = bucket.toString();
+        if(!sortFields.contains(b)) {
+          adjustedSorts.add(new FieldComparator(bucket.toString(), comparatorOrder));
+        }
+      }
+    }
+
+    return adjustedSorts.toArray(new FieldComparator[adjustedSorts.size()]);
   }
 
   private TupleStream handleStats(String zk,
@@ -636,7 +775,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
 
     ModifiableSolrParams solrParams = new ModifiableSolrParams();
     solrParams.add(CommonParams.Q, query);
-    Metric[] metrics = buildMetrics(metricPairs).toArray(new Metric[0]);
+    Metric[] metrics = buildMetrics(metricPairs, false).toArray(new Metric[0]);
     return new StatsStream(zk, collection, solrParams, metrics);
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec6ee96a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
index 605abf5..35f7ad0 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestSQLHandler.java
@@ -115,7 +115,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       List<Tuple> tuples = getTuples(solrStream);
 
       assert(tuples.size() == 8);
-
       Tuple tuple;
 
       tuple = tuples.get(0);
@@ -478,7 +477,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       indexDoc(sdoc("id", "8", "Text_t", "XXXX XXXX", "Str_s", "c", "Field_i", "60"));
       commit();
 
-      SolrParams sParams = mapParams(CommonParams.QT, "/sql",
+      SolrParams sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select id, Field_i, Str_s from collection1 where Text_t='XXXX' order by Field_i desc");
 
       SolrStream solrStream = new SolrStream(jetty.url, sParams);
@@ -545,7 +544,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.get("Str_s").equals("a"));
       assert(tuple.getDouble("EXPR$1") == 7);
 
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
         "stmt", "select Str_s, sum(Field_i) from collection1 where id='(1 8)' group by Str_s having (sum(Field_i) = 7 OR sum(Field_i) = 60) order by sum(Field_i) desc");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -584,7 +583,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       indexDoc(sdoc("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60"));
       commit();
 
-      SolrParams sParams = mapParams(CommonParams.QT, "/sql",
+      SolrParams sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select id, str_s from collection1 where text='XXXX' order by field_iff desc");
 
       SolrStream solrStream = new SolrStream(jetty.url, sParams);
@@ -603,7 +602,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
 
       assert(tuple.getException().contains("Column 'field_iff' not found in any table"));
 
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), sum(field_iff), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s having ((sum(field_iff) = 19) AND (min(field_i) = 8))");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -612,7 +611,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.EXCEPTION);
       assert(tuple.getException().contains("Column 'field_iff' not found in any table"));
 
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), blah(field_i), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -645,7 +644,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       indexr("id", "9", "text", "XXXX XXXY", "str_s", "d", "field_i", "70");
       commit();
 
-      SolrParams sParams = mapParams(CommonParams.QT, "/sql",
+      SolrParams sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
         "stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s order by sum(field_i) asc limit 2");
 
       SolrStream solrStream = new SolrStream(jetty.url, sParams);
@@ -653,7 +652,6 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
 
       //Only two results because of the limit.
       assert(tuples.size() == 2);
-
       Tuple tuple;
 
       tuple = tuples.get(0);
@@ -672,7 +670,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$4") == 20); //max(field_i)
       assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
 
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select str_s as myString, count(*), sum(field_i) as mySum, min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s order by mySum asc limit 2");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -697,7 +695,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$4") == 20); //max(field_i)
       assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
 
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
         "stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), "
           + "cast(avg(1.0 * field_i) as float) from collection1 where (text='XXXX' AND NOT ((text='XXXY') AND (text='XXXY' OR text='XXXY'))) "
           + "group by str_s order by str_s desc");
@@ -735,7 +733,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
 
       assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
 
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select str_s as myString, count(*) as myCount, sum(field_i) as mySum, min(field_i) as myMin, "
           + "max(field_i) as myMax, cast(avg(1.0 * field_i) as float) as myAvg from collection1 "
           + "where (text='XXXX' AND NOT (text='XXXY')) group by str_s order by str_s desc");
@@ -772,7 +770,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("myMax") == 20);
       assert(tuple.getDouble("myAvg") == 13.5D);
 
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) " +
           "from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
 
@@ -789,7 +787,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$4") == 11); //max(field_i)
       assert(tuple.getDouble("EXPR$5") == 9.5D); //avg(field_i)
 
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), cast(avg(1.0 * field_i) as float) " +
           "from collection1 where text='XXXX' group by str_s having ((sum(field_i) = 19) AND (min(field_i) = 8))");
 
@@ -806,7 +804,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$4") == 11); //max(field_i)
       assert(tuple.getDouble("EXPR$5") == 9.5D); //avg(field_i)
 
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), sum(field_i) as mySum, min(field_i), max(field_i), " +
           "cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s " +
           "having ((sum(field_i) = 19) AND (min(field_i) = 8))");
@@ -824,7 +822,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$4") == 11); //max(field_i)
       assert(tuple.getDouble("EXPR$5") == 9.5D); //avg(field_i)
 
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
           "cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s " +
           "having ((sum(field_i) = 19) AND (min(field_i) = 100))");
@@ -1063,7 +1061,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
       commit();
 
-      SolrParams sParams = mapParams(CommonParams.QT, "/sql",
+      SolrParams sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select distinct str_s, field_i from collection1 order by str_s asc, field_i asc");
 
       System.out.println("##################### testSelectDistinct()");
@@ -1071,8 +1069,8 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       TupleStream solrStream = new SolrStream(jetty.url, sParams);
       List<Tuple> tuples = getTuples(solrStream);
 
-      assert(tuples.size() == 6);
 
+      assert(tuples.size() == 6);
       Tuple tuple = tuples.get(0);
       assert(tuple.get("str_s").equals("a"));
       assert(tuple.getLong("field_i") == 1);
@@ -1099,7 +1097,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
 
 
       //reverse the sort
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
         "stmt", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -1134,7 +1132,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getLong("field_i") == 1);
 
 
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
         "stmt", "select distinct str_s as myString, field_i from collection1 order by myString desc, field_i desc");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -1170,7 +1168,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
 
 
       //test with limit
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
         "stmt", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc limit 2");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -1188,7 +1186,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
 
 
       // Test without a sort. Sort should be asc by default.
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select distinct str_s, field_i from collection1");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -1221,7 +1219,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getLong("field_i") == 60);
 
       // Test with a predicate.
-      sParams = mapParams(CommonParams.QT, "/sql",
+      sParams = mapParams(CommonParams.QT, "/sql", "aggregationMode", "map_reduce",
           "stmt", "select distinct str_s, field_i from collection1 where str_s = 'a'");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -1258,7 +1256,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       indexr("id", "7", "text", "XXXX XXXX", "str_s", "c", "field_i", "50");
       indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
       commit();
-      SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
         "stmt", "select distinct str_s, field_i from collection1 order by str_s asc, field_i asc");
 
       SolrStream solrStream = new SolrStream(jetty.url, sParams);
@@ -1294,7 +1292,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
 
 
       //reverse the sort
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
         "stmt", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -1328,7 +1326,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
 
 
       //reverse the sort
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
         "stmt", "select distinct str_s as myString, field_i from collection1 order by myString desc, field_i desc");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -1364,7 +1362,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
 
 
       //test with limit
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
         "stmt", "select distinct str_s, field_i from collection1 order by str_s desc, field_i desc limit 2");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -1382,7 +1380,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
 
 
       // Test without a sort. Sort should be asc by default.
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select distinct str_s, field_i from collection1");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -1415,7 +1413,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getLong("field_i") == 60);
 
       // Test with a predicate.
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select distinct str_s, field_i from collection1 where str_s = 'a'");
 
       solrStream = new SolrStream(jetty.url, sParams);
@@ -1643,7 +1641,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       indexr("id", "8", "text", "XXXX XXXX", "str_s", "c", "field_i", "60");
       commit();
 
-      SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
           "cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s " +
           "order by sum(field_i) asc limit 2");
@@ -1673,7 +1671,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
 
 
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), sum(field_i) as mySum, min(field_i), max(field_i), " +
           "cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s order by mySum asc limit 2");
 
@@ -1700,7 +1698,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
 
 
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
           "cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s order by str_s desc");
 
@@ -1737,7 +1735,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
 
 
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select str_s as myString, count(*), sum(field_i), min(field_i), max(field_i), " +
           "cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s order by myString desc");
 
@@ -1774,7 +1772,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$5") == 13.5D); //avg(field_i)
 
 
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
           "cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s having sum(field_i) = 19");
 
@@ -1791,7 +1789,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$4") == 11); //max(field_i)
       assert(tuple.getDouble("EXPR$5") == 9.5D); //avg(field_i)
 
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
           "cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s " +
           "having ((sum(field_i) = 19) AND (min(field_i) = 8))");
@@ -1809,7 +1807,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$4") == 11); //max(field_i)
       assert(tuple.getDouble("EXPR$5") == 9.5D); //avg(field_i)
 
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select str_s, count(*), sum(field_i), min(field_i), max(field_i), " +
           "cast(avg(1.0 * field_i) as float) from collection1 where text='XXXX' group by str_s " +
           "having ((sum(field_i) = 19) AND (min(field_i) = 100))");
@@ -2224,7 +2222,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       indexr("id", "8", "year_i", "2014", "month_i", "4", "day_i", "2", "item_i", "1");
 
       commit();
-      SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", 
+      SolrParams sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select year_i, sum(item_i) from collection1 group by year_i order by year_i desc");
 
       SolrStream solrStream = new SolrStream(jetty.url, sParams);
@@ -2243,7 +2241,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getLong("year_i") == 2014);
       assert(tuple.getDouble("EXPR$1") == 7); //sum(item_i)
 
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select year_i, month_i, sum(item_i) from collection1 group by year_i, month_i " +
           "order by year_i desc, month_i desc");
 
@@ -2270,7 +2268,7 @@ public class TestSQLHandler extends AbstractFullDistribZkTestBase {
       assert(tuple.getDouble("EXPR$2") == 7); //sum(item_i)
 
 
-      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2",
+      sParams = mapParams(CommonParams.QT, "/sql", "numWorkers", "2", "aggregationMode", "map_reduce",
           "stmt", "select year_i, month_i, day_i, sum(item_i) from collection1 group by year_i, month_i, day_i " +
           "order by year_i desc, month_i desc, day_i desc");
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec6ee96a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
index 143143f..0f95103 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java
@@ -17,6 +17,7 @@
 package org.apache.solr.client.solrj.io.stream;
 
 import java.io.IOException;
+import java.sql.Array;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
@@ -88,6 +89,7 @@ public class JDBCStream extends TupleStream implements Expressible {
   private ResultSetValueSelector[] valueSelectors;
   protected ResultSet resultSet;
   protected transient StreamContext streamContext;
+  protected String sep = Character.toString((char)31);
 
   public JDBCStream(String connectionUrl, String sqlQuery, StreamComparator definedSort) throws IOException {
     this(connectionUrl, sqlQuery, definedSort, null, null);
@@ -231,12 +233,20 @@ public class JDBCStream extends TupleStream implements Expressible {
       final String columnName = metadata.getColumnLabel(columnNumber);
       String className = metadata.getColumnClassName(columnNumber);
       String typeName = metadata.getColumnTypeName(columnNumber);
-            
+      
       if(directSupportedTypes.contains(className)){
         valueSelectors[columnIdx] = new ResultSetValueSelector() {
           public Object selectValue(ResultSet resultSet) throws SQLException {
             Object obj = resultSet.getObject(columnNumber);
             if(resultSet.wasNull()){ return null; }
+            if(obj instanceof String) {
+              String s = (String)obj;
+              if(s.indexOf(sep) > -1) {
+                s = s.substring(1);
+                return s.split(sep);
+              }
+            }
+
             return obj;
           }
           public String getColumnName() {
@@ -276,6 +286,22 @@ public class JDBCStream extends TupleStream implements Expressible {
             return columnName;
           }
         };
+      } else if(Array.class.getName().equals(className)) {
+        valueSelectors[columnIdx] = new ResultSetValueSelector() {
+          public Object selectValue(ResultSet resultSet) throws SQLException {
+            Object o = resultSet.getObject(columnNumber);
+            if(resultSet.wasNull()){ return null; }
+            if(o instanceof Array) {
+              Array array = (Array)o;
+              return array.getArray();
+            } else {
+              return o;
+            }
+          }
+          public String getColumnName() {
+            return columnName;
+          }
+        };
       } else {
         throw new SQLException(String.format(Locale.ROOT,
             "Unable to determine the valueSelector for column '%s' (col #%d) of java class '%s' and type '%s'",