You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2017/11/29 06:13:12 UTC

[3/4] calcite git commit: [CALCITE-1616] Data profiler

[CALCITE-1616] Data profiler

Profiler does not work on JDK 7, because it relies on
com.yahoo.datasketches:sketches-core:0.9.0, which needs JDK 8 or
higher. Therefore some tests are disabled on JDK 7.

Change interface LatticeStatisticProvider to estimate joint rather
than singleton cardinality.

Bind LatticeStatisticProvider to a Lattice, and create
LatticeStatisticProvider.Factory to do the binding.

Keep results only if they are sufficiently "surprising" (informative).
The "surprise queue" is FIFO and holds the top N accepted values,
after a warm-up period

PartiallyOrderedSet: find "hypothetical" parents and children of
elements not in the set.


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/dad58186
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/dad58186
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/dad58186

Branch: refs/heads/master
Commit: dad581868b815510389f61936b0c583be93d5dc5
Parents: 01eb6b5
Author: Julian Hyde <jh...@apache.org>
Authored: Wed Feb 1 19:43:46 2017 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Nov 28 10:20:25 2017 -0800

----------------------------------------------------------------------
 core/pom.xml                                    |   4 +
 .../CachingLatticeStatisticProvider.java        |  40 +-
 .../DelegatingLatticeStatisticProvider.java     |   6 +-
 .../org/apache/calcite/materialize/Lattice.java | 196 +++--
 .../materialize/LatticeStatisticProvider.java   |  14 +-
 .../apache/calcite/materialize/Lattices.java    |  16 +-
 .../ProfilerLatticeStatisticProvider.java       | 105 +++
 .../SqlLatticeStatisticProvider.java            |  40 +-
 .../org/apache/calcite/profile/Profiler.java    | 294 +++++++
 .../apache/calcite/profile/ProfilerImpl.java    | 809 +++++++++++++++++++
 .../apache/calcite/profile/SimpleProfiler.java  | 341 ++++++++
 .../apache/calcite/profile/package-info.java    |  26 +
 .../calcite/util/PartiallyOrderedSet.java       | 215 +++--
 .../apache/calcite/profile/ProfilerTest.java    | 682 ++++++++++++++++
 .../org/apache/calcite/test/CalciteSuite.java   |   2 +
 .../test/FoodMartLatticeStatisticProvider.java  |  30 +-
 .../org/apache/calcite/test/LatticeTest.java    | 117 ++-
 .../java/org/apache/calcite/test/Matchers.java  |  81 ++
 .../calcite/util/PartiallyOrderedSetTest.java   |  78 +-
 .../java/org/apache/calcite/util/TestUtil.java  |  17 +
 .../apache/calcite/adapter/tpch/TpchTest.java   |   6 +-
 pom.xml                                         |  12 +-
 22 files changed, 2928 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 73c1866..5526122 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -89,6 +89,10 @@ limitations under the License.
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>com.yahoo.datasketches</groupId>
+      <artifactId>sketches-core</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/src/main/java/org/apache/calcite/materialize/CachingLatticeStatisticProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/materialize/CachingLatticeStatisticProvider.java b/core/src/main/java/org/apache/calcite/materialize/CachingLatticeStatisticProvider.java
index e408335..21c6fed 100644
--- a/core/src/main/java/org/apache/calcite/materialize/CachingLatticeStatisticProvider.java
+++ b/core/src/main/java/org/apache/calcite/materialize/CachingLatticeStatisticProvider.java
@@ -16,43 +16,51 @@
  */
 package org.apache.calcite.materialize;
 
-import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
+import javax.annotation.Nonnull;
 
 /**
- * Implementation of {@link LatticeStatisticProvider} that gets statistics by
- * executing "SELECT COUNT(DISTINCT ...) ..." SQL queries.
+ * Implementation of {@link LatticeStatisticProvider} that caches single-column
+ * statistics and computes multi-column statistics from these.
  */
 class CachingLatticeStatisticProvider implements LatticeStatisticProvider {
-  private final LoadingCache<Pair<Lattice, Lattice.Column>, Integer> cache;
+  private final Lattice lattice;
+  private final LoadingCache<Lattice.Column, Double> cache;
 
   /** Creates a CachingStatisticProvider. */
-  CachingLatticeStatisticProvider(
+  CachingLatticeStatisticProvider(final Lattice lattice,
       final LatticeStatisticProvider provider) {
-    cache = CacheBuilder.<Pair<Lattice, Lattice.Column>>newBuilder()
+    this.lattice = lattice;
+    cache = CacheBuilder.<Lattice.Column>newBuilder()
         .build(
-            new CacheLoader<Pair<Lattice, Lattice.Column>, Integer>() {
-              public Integer load(Pair<Lattice, Lattice.Column> key)
-                  throws Exception {
-                return provider.cardinality(key.left, key.right);
+            new CacheLoader<Lattice.Column, Double>() {
+              public Double load(@Nonnull Lattice.Column key) throws Exception {
+                return provider.cardinality(ImmutableList.of(key));
               }
             });
   }
 
-  public int cardinality(Lattice lattice, Lattice.Column column) {
-    try {
-      return cache.get(Pair.of(lattice, column));
-    } catch (UncheckedExecutionException | ExecutionException e) {
-      Util.throwIfUnchecked(e.getCause());
-      throw new RuntimeException(e.getCause());
+  public double cardinality(List<Lattice.Column> columns) {
+    final List<Double> counts = new ArrayList<>();
+    for (Lattice.Column column : columns) {
+      try {
+        counts.add(cache.get(column));
+      } catch (UncheckedExecutionException | ExecutionException e) {
+        Util.throwIfUnchecked(e.getCause());
+        throw new RuntimeException(e.getCause());
+      }
     }
+    return (int) Lattice.getRowCount(lattice.getFactRowCount(), counts);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/src/main/java/org/apache/calcite/materialize/DelegatingLatticeStatisticProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/materialize/DelegatingLatticeStatisticProvider.java b/core/src/main/java/org/apache/calcite/materialize/DelegatingLatticeStatisticProvider.java
index e1ec6c5..f889dae 100644
--- a/core/src/main/java/org/apache/calcite/materialize/DelegatingLatticeStatisticProvider.java
+++ b/core/src/main/java/org/apache/calcite/materialize/DelegatingLatticeStatisticProvider.java
@@ -16,6 +16,8 @@
  */
 package org.apache.calcite.materialize;
 
+import java.util.List;
+
 /**
  * Implementation of {@link LatticeStatisticProvider} that delegates
  * to an underlying provider.
@@ -33,8 +35,8 @@ public class DelegatingLatticeStatisticProvider
     this.provider = provider;
   }
 
-  public int cardinality(Lattice lattice, Lattice.Column column) {
-    return provider.cardinality(lattice, column);
+  public double cardinality(List<Lattice.Column> columns) {
+    return provider.cardinality(columns);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/src/main/java/org/apache/calcite/materialize/Lattice.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/materialize/Lattice.java b/core/src/main/java/org/apache/calcite/materialize/Lattice.java
index 20a28a7..3828416 100644
--- a/core/src/main/java/org/apache/calcite/materialize/Lattice.java
+++ b/core/src/main/java/org/apache/calcite/materialize/Lattice.java
@@ -19,6 +19,7 @@ package org.apache.calcite.materialize;
 import org.apache.calcite.avatica.AvaticaUtils;
 import org.apache.calcite.jdbc.CalcitePrepare;
 import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.linq4j.tree.Primitive;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.prepare.CalcitePrepareImpl;
 import org.apache.calcite.rel.RelNode;
@@ -60,7 +61,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 
-import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -113,16 +114,15 @@ public class Lattice {
 
   private Lattice(CalciteSchema rootSchema, ImmutableList<Node> nodes,
       boolean auto, boolean algorithm, long algorithmMaxMillis,
-      LatticeStatisticProvider statisticProvider, Double rowCountEstimate,
-      ImmutableList<Column> columns, ImmutableList<Measure> defaultMeasures,
-      ImmutableList<Tile> tiles) {
+      LatticeStatisticProvider.Factory statisticProviderFactory,
+      Double rowCountEstimate, ImmutableList<Column> columns,
+      ImmutableList<Measure> defaultMeasures, ImmutableList<Tile> tiles) {
     this.rootSchema = rootSchema;
     this.nodes = Preconditions.checkNotNull(nodes);
     this.columns = Preconditions.checkNotNull(columns);
     this.auto = auto;
     this.algorithm = algorithm;
     this.algorithmMaxMillis = algorithmMaxMillis;
-    this.statisticProvider = Preconditions.checkNotNull(statisticProvider);
     this.defaultMeasures = Preconditions.checkNotNull(defaultMeasures);
     this.tiles = Preconditions.checkNotNull(tiles);
 
@@ -151,6 +151,8 @@ public class Lattice {
     }
     Preconditions.checkArgument(rowCountEstimate > 0d);
     this.rowCountEstimate = rowCountEstimate;
+    this.statisticProvider =
+        Preconditions.checkNotNull(statisticProviderFactory.apply(this));
   }
 
   /** Creates a Lattice. */
@@ -233,74 +235,92 @@ public class Lattice {
     throw new AssertionError("input not found");
   }
 
+  /** Generates a SQL query to populate a tile of the lattice specified by a
+   * given set of columns and measures. */
   public String sql(ImmutableBitSet groupSet, List<Measure> aggCallList) {
-    final ImmutableBitSet.Builder columnSetBuilder = groupSet.rebuild();
-    for (Measure call : aggCallList) {
-      for (Column arg : call.args) {
-        columnSetBuilder.set(arg.ordinal);
+    return sql(groupSet, true, aggCallList);
+  }
+
+  /** Generates a SQL query to populate a tile of the lattice specified by a
+   * given set of columns and measures, optionally grouping. */
+  public String sql(ImmutableBitSet groupSet, boolean group,
+      List<Measure> aggCallList) {
+    final List<Node> usedNodes = new ArrayList<>();
+    if (group) {
+      final ImmutableBitSet.Builder columnSetBuilder = groupSet.rebuild();
+      for (Measure call : aggCallList) {
+        for (Column arg : call.args) {
+          columnSetBuilder.set(arg.ordinal);
+        }
       }
-    }
-    final ImmutableBitSet columnSet = columnSetBuilder.build();
+      final ImmutableBitSet columnSet = columnSetBuilder.build();
 
-    // Figure out which nodes are needed. Use a node if its columns are used
-    // or if has a child whose columns are used.
-    List<Node> usedNodes = Lists.newArrayList();
-    for (Node node : nodes) {
-      if (ImmutableBitSet.range(node.startCol, node.endCol)
-          .intersects(columnSet)) {
-        use(usedNodes, node);
+      // Figure out which nodes are needed. Use a node if its columns are used
+      // or if has a child whose columns are used.
+      for (Node node : nodes) {
+        if (ImmutableBitSet.range(node.startCol, node.endCol)
+            .intersects(columnSet)) {
+          use(usedNodes, node);
+        }
       }
+      if (usedNodes.isEmpty()) {
+        usedNodes.add(nodes.get(0));
+      }
+    } else {
+      usedNodes.addAll(nodes);
     }
-    if (usedNodes.isEmpty()) {
-      usedNodes.add(nodes.get(0));
-    }
+
     final SqlDialect dialect = SqlDialect.DatabaseProduct.CALCITE.getDialect();
     final StringBuilder buf = new StringBuilder("SELECT ");
     final StringBuilder groupBuf = new StringBuilder("\nGROUP BY ");
     int k = 0;
     final Set<String> columnNames = Sets.newHashSet();
-    for (int i : groupSet) {
-      if (k++ > 0) {
-        buf.append(", ");
-        groupBuf.append(", ");
+    if (groupSet != null) {
+      for (int i : groupSet) {
+        if (k++ > 0) {
+          buf.append(", ");
+          groupBuf.append(", ");
+        }
+        final Column column = columns.get(i);
+        dialect.quoteIdentifier(buf, column.identifiers());
+        dialect.quoteIdentifier(groupBuf, column.identifiers());
+        final String fieldName = uniqueColumnNames.get(i);
+        columnNames.add(fieldName);
+        if (!column.alias.equals(fieldName)) {
+          buf.append(" AS ");
+          dialect.quoteIdentifier(buf, fieldName);
+        }
       }
-      final Column column = columns.get(i);
-      dialect.quoteIdentifier(buf, column.identifiers());
-      dialect.quoteIdentifier(groupBuf, column.identifiers());
-      final String fieldName = uniqueColumnNames.get(i);
-      columnNames.add(fieldName);
-      if (!column.alias.equals(fieldName)) {
-        buf.append(" AS ");
-        dialect.quoteIdentifier(buf, fieldName);
+      if (groupSet.isEmpty()) {
+        groupBuf.append("()");
       }
-    }
-    if (groupSet.isEmpty()) {
-      groupBuf.append("()");
-    }
-    int m = 0;
-    for (Measure measure : aggCallList) {
-      if (k++ > 0) {
-        buf.append(", ");
-      }
-      buf.append(measure.agg.getName())
-          .append("(");
-      if (measure.args.isEmpty()) {
-        buf.append("*");
-      } else {
-        int z = 0;
-        for (Column arg : measure.args) {
-          if (z++ > 0) {
-            buf.append(", ");
+      int m = 0;
+      for (Measure measure : aggCallList) {
+        if (k++ > 0) {
+          buf.append(", ");
+        }
+        buf.append(measure.agg.getName())
+            .append("(");
+        if (measure.args.isEmpty()) {
+          buf.append("*");
+        } else {
+          int z = 0;
+          for (Column arg : measure.args) {
+            if (z++ > 0) {
+              buf.append(", ");
+            }
+            dialect.quoteIdentifier(buf, arg.identifiers());
           }
-          dialect.quoteIdentifier(buf, arg.identifiers());
         }
+        buf.append(") AS ");
+        String measureName;
+        while (!columnNames.add(measureName = "m" + m)) {
+          ++m;
+        }
+        dialect.quoteIdentifier(buf, measureName);
       }
-      buf.append(") AS ");
-      String measureName;
-      while (!columnNames.add(measureName = "m" + m)) {
-        ++m;
-      }
-      dialect.quoteIdentifier(buf, measureName);
+    } else {
+      buf.append("*");
     }
     buf.append("\nFROM ");
     for (Node node : usedNodes) {
@@ -329,7 +349,9 @@ public class Lattice {
       System.out.println("Lattice SQL:\n"
           + buf);
     }
-    buf.append(groupBuf);
+    if (group) {
+      buf.append(groupBuf);
+    }
     return buf.toString();
   }
 
@@ -381,30 +403,40 @@ public class Lattice {
   /** Returns an estimate of the number of rows in the tile with the given
    * dimensions. */
   public double getRowCount(List<Column> columns) {
+    return statisticProvider.cardinality(columns);
+  }
+
+  /** Returns an estimate of the number of rows in the tile with the given
+   * dimensions. */
+  public static double getRowCount(double factCount, double... columnCounts) {
+    return getRowCount(factCount, Primitive.asList(columnCounts));
+  }
+
+  /** Returns an estimate of the number of rows in the tile with the given
+   * dimensions. */
+  public static double getRowCount(double factCount,
+      List<Double> columnCounts) {
     // The expected number of distinct values when choosing p values
     // with replacement from n integers is n . (1 - ((n - 1) / n) ^ p).
     //
     // If we have several uniformly distributed attributes A1 ... Am
     // with N1 ... Nm distinct values, they behave as one uniformly
     // distributed attribute with N1 * ... * Nm distinct values.
-    BigInteger n = BigInteger.ONE;
-    for (Column column : columns) {
-      final int cardinality = statisticProvider.cardinality(this, column);
-      if (cardinality > 1) {
-        n = n.multiply(BigInteger.valueOf(cardinality));
+    double n = 1d;
+    for (Double columnCount : columnCounts) {
+      if (columnCount > 1d) {
+        n *= columnCount;
       }
     }
-    final double nn = n.doubleValue();
-    final double f = getFactRowCount();
-    final double a = (nn - 1d) / nn;
+    final double a = (n - 1d) / n;
     if (a == 1d) {
       // A under-flows if nn is large.
-      return f;
+      return factCount;
     }
-    final double v = nn * (1d - Math.pow(a, f));
+    final double v = n * (1d - Math.pow(a, factCount));
     // Cap at fact-row-count, because numerical artifacts can cause it
     // to go a few % over.
-    return Math.min(v, f);
+    return Math.min(v, factCount);
   }
 
   /** Source relation of a lattice.
@@ -535,6 +567,15 @@ public class Lattice {
       this.alias = Preconditions.checkNotNull(alias);
     }
 
+    /** Converts a list of columns to a bit set of their ordinals. */
+    static ImmutableBitSet toBitSet(List<Column> columns) {
+      final ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
+      for (Column column : columns) {
+        builder.set(column.ordinal);
+      }
+      return builder.build();
+    }
+
     public int compareTo(Column column) {
       return Utilities.compare(ordinal, column.ordinal);
     }
@@ -690,9 +731,10 @@ public class Lattice {
 
     /** Builds a lattice. */
     public Lattice build() {
-      LatticeStatisticProvider statisticProvider =
+      LatticeStatisticProvider.Factory statisticProvider =
           this.statisticProvider != null
-              ? AvaticaUtils.instantiatePlugin(LatticeStatisticProvider.class,
+              ? AvaticaUtils.instantiatePlugin(
+                  LatticeStatisticProvider.Factory.class,
                   this.statisticProvider)
               : Lattices.CACHED_SQL;
       Preconditions.checkArgument(rootSchema.isRoot(), "must be root schema");
@@ -815,15 +857,11 @@ public class Lattice {
 
     public Tile(ImmutableList<Measure> measures,
         ImmutableList<Column> dimensions) {
-      this.measures = measures;
-      this.dimensions = dimensions;
+      this.measures = Preconditions.checkNotNull(measures);
+      this.dimensions = Preconditions.checkNotNull(dimensions);
       assert Ordering.natural().isStrictlyOrdered(dimensions);
       assert Ordering.natural().isStrictlyOrdered(measures);
-      final ImmutableBitSet.Builder bitSetBuilder = ImmutableBitSet.builder();
-      for (Column dimension : dimensions) {
-        bitSetBuilder.set(dimension.ordinal);
-      }
-      bitSet = bitSetBuilder.build();
+      bitSet = Column.toBitSet(dimensions);
     }
 
     public static TileBuilder builder() {

http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/src/main/java/org/apache/calcite/materialize/LatticeStatisticProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/materialize/LatticeStatisticProvider.java b/core/src/main/java/org/apache/calcite/materialize/LatticeStatisticProvider.java
index 46668cc..d16e903 100644
--- a/core/src/main/java/org/apache/calcite/materialize/LatticeStatisticProvider.java
+++ b/core/src/main/java/org/apache/calcite/materialize/LatticeStatisticProvider.java
@@ -16,12 +16,22 @@
  */
 package org.apache.calcite.materialize;
 
+import com.google.common.base.Function;
+
+import java.util.List;
+
 /**
  * Estimates row counts for a lattice and its attributes.
  */
 public interface LatticeStatisticProvider {
-  /** Returns an estimate of the number of distinct values in a column. */
-  int cardinality(Lattice lattice, Lattice.Column column);
+  /** Returns an estimate of the number of distinct values in a column
+   * or list of columns. */
+  double cardinality(List<Lattice.Column> columns);
+
+  /** Creates a {@link LatticeStatisticProvider} for a given
+   * {@link org.apache.calcite.materialize.Lattice}. */
+  interface Factory extends Function<Lattice, LatticeStatisticProvider> {
+  }
 }
 
 // End LatticeStatisticProvider.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/src/main/java/org/apache/calcite/materialize/Lattices.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/materialize/Lattices.java b/core/src/main/java/org/apache/calcite/materialize/Lattices.java
index cf70719..2a0a7ea 100644
--- a/core/src/main/java/org/apache/calcite/materialize/Lattices.java
+++ b/core/src/main/java/org/apache/calcite/materialize/Lattices.java
@@ -23,18 +23,16 @@ public class Lattices {
   private Lattices() {}
 
   /** Statistics provider that uses SQL. */
-  public static final LatticeStatisticProvider SQL =
-      SqlLatticeStatisticProvider.INSTANCE;
+  public static final LatticeStatisticProvider.Factory SQL =
+      SqlLatticeStatisticProvider.FACTORY;
 
   /** Statistics provider that uses SQL then stores the results in a cache. */
-  public static final LatticeStatisticProvider CACHED_SQL =
-      cache(SqlLatticeStatisticProvider.INSTANCE);
+  public static final LatticeStatisticProvider.Factory CACHED_SQL =
+      SqlLatticeStatisticProvider.CACHED_FACTORY;
 
-  /** Wraps a statistic provider in a cache. */
-  public static LatticeStatisticProvider cache(
-      LatticeStatisticProvider provider) {
-    return new CachingLatticeStatisticProvider(provider);
-  }
+  /** Statistics provider that uses a profiler. */
+  public static final LatticeStatisticProvider.Factory PROFILER =
+      ProfilerLatticeStatisticProvider.FACTORY;
 }
 
 // End Lattices.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/src/main/java/org/apache/calcite/materialize/ProfilerLatticeStatisticProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/materialize/ProfilerLatticeStatisticProvider.java b/core/src/main/java/org/apache/calcite/materialize/ProfilerLatticeStatisticProvider.java
new file mode 100644
index 0000000..39e0b29
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/materialize/ProfilerLatticeStatisticProvider.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.materialize;
+
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.profile.Profiler;
+import org.apache.calcite.profile.ProfilerImpl;
+import org.apache.calcite.rel.metadata.NullSentinel;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Implementation of {@link LatticeStatisticProvider} that uses a
+ * {@link org.apache.calcite.profile.Profiler}.
+ */
+class ProfilerLatticeStatisticProvider implements LatticeStatisticProvider {
+  static final Factory FACTORY =
+      new Factory() {
+        public LatticeStatisticProvider apply(Lattice lattice) {
+          return new ProfilerLatticeStatisticProvider(lattice);
+        }
+      };
+
+  /** Converts an array of values to a list of {@link Comparable} values,
+   * converting null values to sentinels. */
+  private static final Function1<Object[], List<Comparable>> TO_LIST =
+      new Function1<Object[], List<Comparable>>() {
+        public List<Comparable> apply(Object[] values) {
+          for (int i = 0; i < values.length; i++) {
+            if (values[i] == null) {
+              values[i] = NullSentinel.INSTANCE;
+            }
+          }
+          //noinspection unchecked
+          return (List) Arrays.asList(values);
+        }
+      };
+
+  private final Lattice lattice;
+  private final Supplier<Profiler.Profile> profile =
+      Suppliers.memoize(new Supplier<Profiler.Profile>() {
+        public Profiler.Profile get() {
+          final ProfilerImpl profiler =
+              ProfilerImpl.builder()
+                  .withPassSize(200)
+                  .withMinimumSurprise(0.3D)
+                  .build();
+          final List<Profiler.Column> columns = new ArrayList<>();
+          for (Lattice.Column column : lattice.columns) {
+            columns.add(new Profiler.Column(column.ordinal, column.alias));
+          }
+          final String sql =
+              lattice.sql(ImmutableBitSet.range(lattice.columns.size()),
+                  false, ImmutableList.<Lattice.Measure>of());
+          final Table table =
+              new MaterializationService.DefaultTableFactory()
+                  .createTable(lattice.rootSchema, sql,
+                      ImmutableList.<String>of());
+          final ImmutableList<ImmutableBitSet> initialGroups =
+              ImmutableList.of();
+          final Enumerable<List<Comparable>> rows =
+              ((ScannableTable) table).scan(null).select(TO_LIST);
+          return profiler.profile(rows, columns, initialGroups);
+        }
+      });
+
+  /** Creates a ProfilerLatticeStatisticProvider. */
+  private ProfilerLatticeStatisticProvider(Lattice lattice) {
+    this.lattice = Preconditions.checkNotNull(lattice);
+  }
+
+  public double cardinality(List<Lattice.Column> columns) {
+    final ImmutableBitSet build = Lattice.Column.toBitSet(columns);
+    final double cardinality = profile.get().cardinality(build);
+//    System.out.println(columns + ": " + cardinality);
+    return cardinality;
+  }
+}
+
+// End ProfilerLatticeStatisticProvider.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/src/main/java/org/apache/calcite/materialize/SqlLatticeStatisticProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/materialize/SqlLatticeStatisticProvider.java b/core/src/main/java/org/apache/calcite/materialize/SqlLatticeStatisticProvider.java
index 4a94847..8e5b19d 100644
--- a/core/src/main/java/org/apache/calcite/materialize/SqlLatticeStatisticProvider.java
+++ b/core/src/main/java/org/apache/calcite/materialize/SqlLatticeStatisticProvider.java
@@ -20,28 +20,56 @@ import org.apache.calcite.schema.ScannableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.util.ImmutableBitSet;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Implementation of {@link LatticeStatisticProvider} that gets statistics by
  * executing "SELECT COUNT(DISTINCT ...) ..." SQL queries.
  */
 class SqlLatticeStatisticProvider implements LatticeStatisticProvider {
-  static final SqlLatticeStatisticProvider INSTANCE =
-      new SqlLatticeStatisticProvider();
+  static final Factory FACTORY =
+      new LatticeStatisticProvider.Factory() {
+        public LatticeStatisticProvider apply(Lattice lattice) {
+          return new SqlLatticeStatisticProvider(lattice);
+        }
+      };
+
+  static final Factory CACHED_FACTORY =
+      new LatticeStatisticProvider.Factory() {
+        public LatticeStatisticProvider apply(Lattice lattice) {
+          LatticeStatisticProvider provider = FACTORY.apply(lattice);
+          return new CachingLatticeStatisticProvider(lattice, provider);
+        }
+      };
+
+  private final Lattice lattice;
 
-  /** Creates an SqlLatticeStatisticProvider. */
-  private SqlLatticeStatisticProvider() {}
+  /** Creates a SqlLatticeStatisticProvider. */
+  private SqlLatticeStatisticProvider(Lattice lattice) {
+    this.lattice = Preconditions.checkNotNull(lattice);
+  }
+
+  public double cardinality(List<Lattice.Column> columns) {
+    final List<Double> counts = new ArrayList<>();
+    for (Lattice.Column column : columns) {
+      counts.add(cardinality(lattice, column));
+    }
+    return (int) Lattice.getRowCount(lattice.getFactRowCount(), counts);
+  }
 
-  @Override public int cardinality(Lattice lattice, Lattice.Column column) {
+  private double cardinality(Lattice lattice, Lattice.Column column) {
     final String sql = lattice.countSql(ImmutableBitSet.of(column.ordinal));
     final Table table =
         new MaterializationService.DefaultTableFactory()
             .createTable(lattice.rootSchema, sql, ImmutableList.<String>of());
     final Object[] values =
         Iterables.getOnlyElement(((ScannableTable) table).scan(null));
-    return ((Number) values[0]).intValue();
+    return ((Number) values[0]).doubleValue();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/src/main/java/org/apache/calcite/profile/Profiler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/profile/Profiler.java b/core/src/main/java/org/apache/calcite/profile/Profiler.java
new file mode 100644
index 0000000..851f40b
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/profile/Profiler.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.profile;
+
+import org.apache.calcite.materialize.Lattice;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.JsonBuilder;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedSet;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.SortedSet;
+import javax.annotation.Nonnull;
+
+/**
+ * Analyzes data sets.
+ */
+public interface Profiler {
+  /** Creates a profile of a data set.
+   *
+   * @param rows List of rows. Can be iterated over more than once (maybe not
+   *             cheaply)
+   * @param columns Column definitions
+   *
+   * @param initialGroups List of combinations of columns that should be
+   *                     profiled early, because they may be interesting
+   *
+   * @return A profile describing relationships within the data set
+   */
+  Profile profile(Iterable<List<Comparable>> rows, List<Column> columns,
+      Collection<ImmutableBitSet> initialGroups);
+
+  /** Column. */
+  class Column implements Comparable<Column> {
+    public final int ordinal;
+    public final String name;
+
+    /** Creates a Column.
+     *
+     * @param ordinal Unique and contiguous within a particular data set
+     * @param name Name of the column
+     */
+    public Column(int ordinal, String name) {
+      this.ordinal = ordinal;
+      this.name = name;
+    }
+
+    static ImmutableBitSet toOrdinals(Iterable<Column> columns) {
+      final ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
+      for (Column column : columns) {
+        builder.set(column.ordinal);
+      }
+      return builder.build();
+    }
+
+    @Override public int hashCode() {
+      return ordinal;
+    }
+
+    @Override public boolean equals(Object o) {
+      return this == o
+          || o instanceof Column
+          && ordinal == ((Column) o).ordinal;
+    }
+
+    @Override public int compareTo(@Nonnull Column column) {
+      return Integer.compare(ordinal, column.ordinal);
+    }
+
+    @Override public String toString() {
+      return name;
+    }
+  }
+
+  /** Statistic produced by the profiler. */
+  interface Statistic {
+    Object toMap(JsonBuilder jsonBuilder);
+  }
+
+  /** Whole data set. */
+  class RowCount implements Statistic {
+    final int rowCount;
+
+    public RowCount(int rowCount) {
+      this.rowCount = rowCount;
+    }
+
+    public Object toMap(JsonBuilder jsonBuilder) {
+      final Map<String, Object> map = jsonBuilder.map();
+      map.put("type", "rowCount");
+      map.put("rowCount", rowCount);
+      return map;
+    }
+  }
+
+  /** Unique key. */
+  class Unique implements Statistic {
+    final NavigableSet<Column> columns;
+
+    public Unique(SortedSet<Column> columns) {
+      this.columns = ImmutableSortedSet.copyOf(columns);
+    }
+
+    public Object toMap(JsonBuilder jsonBuilder) {
+      final Map<String, Object> map = jsonBuilder.map();
+      map.put("type", "unique");
+      map.put("columns", FunctionalDependency.getObjects(jsonBuilder, columns));
+      return map;
+    }
+  }
+
+  /** Functional dependency. */
+  class FunctionalDependency implements Statistic {
+    final NavigableSet<Column> columns;
+    final Column dependentColumn;
+
+    FunctionalDependency(SortedSet<Column> columns, Column dependentColumn) {
+      this.columns = ImmutableSortedSet.copyOf(columns);
+      this.dependentColumn = dependentColumn;
+    }
+
+    public Object toMap(JsonBuilder jsonBuilder) {
+      final Map<String, Object> map = jsonBuilder.map();
+      map.put("type", "fd");
+      map.put("columns", getObjects(jsonBuilder, columns));
+      map.put("dependentColumn", dependentColumn.name);
+      return map;
+    }
+
+    private static List<Object> getObjects(JsonBuilder jsonBuilder,
+        NavigableSet<Column> columns) {
+      final List<Object> list = jsonBuilder.list();
+      for (Column column : columns) {
+        list.add(column.name);
+      }
+      return list;
+    }
+  }
+
+  /** Value distribution, including cardinality and optionally values, of a
+   * column or set of columns. If the set of columns is empty, it describes
+   * the number of rows in the entire data set. */
+  class Distribution implements Statistic {
+    final NavigableSet<Column> columns;
+    final NavigableSet<Comparable> values;
+    final double cardinality;
+    final int nullCount;
+    final double expectedCardinality;
+    final boolean minimal;
+
+    /** Creates a Distribution.
+     *
+     * @param columns Column or columns being described
+     * @param values Values of columns, or null if there are too many
+     * @param cardinality Number of distinct values
+     * @param nullCount Number of rows where this column had a null value;
+     * @param expectedCardinality Expected cardinality
+     * @param minimal Whether the distribution is not implied by a unique
+     *   or functional dependency
+     */
+    public Distribution(SortedSet<Column> columns, SortedSet<Comparable> values,
+        double cardinality, int nullCount, double expectedCardinality,
+        boolean minimal) {
+      this.columns = ImmutableSortedSet.copyOf(columns);
+      this.values = values == null ? null : ImmutableSortedSet.copyOf(values);
+      this.cardinality = cardinality;
+      this.nullCount = nullCount;
+      this.expectedCardinality = expectedCardinality;
+      this.minimal = minimal;
+    }
+
+    public Object toMap(JsonBuilder jsonBuilder) {
+      final Map<String, Object> map = jsonBuilder.map();
+      map.put("type", "distribution");
+      map.put("columns", FunctionalDependency.getObjects(jsonBuilder, columns));
+      if (values != null) {
+        List<Object> list = jsonBuilder.list();
+        for (Comparable value : values) {
+          if (value instanceof java.sql.Date) {
+            value = value.toString();
+          }
+          list.add(value);
+        }
+        map.put("values", list);
+      }
+      map.put("cardinality", cardinality);
+      if (nullCount > 0) {
+        map.put("nullCount", nullCount);
+      }
+      map.put("expectedCardinality", expectedCardinality);
+      map.put("surprise", surprise());
+      return map;
+    }
+
+    ImmutableBitSet columnOrdinals() {
+      return Column.toOrdinals(columns);
+    }
+
+    double surprise() {
+      return SimpleProfiler.surprise(expectedCardinality, cardinality);
+    }
+  }
+
+  /** The result of profiling, contains various statistics about the
+   * data in a table. */
+  class Profile {
+    public final RowCount rowCount;
+    public final List<FunctionalDependency> functionalDependencyList;
+    public final List<Distribution> distributionList;
+    public final List<Unique> uniqueList;
+
+    private final Map<ImmutableBitSet, Distribution> distributionMap;
+    private final List<Distribution> singletonDistributionList;
+
+    Profile(List<Column> columns, RowCount rowCount,
+        Iterable<FunctionalDependency> functionalDependencyList,
+        Iterable<Distribution> distributionList, Iterable<Unique> uniqueList) {
+      this.rowCount = rowCount;
+      this.functionalDependencyList =
+          ImmutableList.copyOf(functionalDependencyList);
+      this.distributionList = ImmutableList.copyOf(distributionList);
+      this.uniqueList = ImmutableList.copyOf(uniqueList);
+
+      final ImmutableMap.Builder<ImmutableBitSet, Distribution> m =
+          ImmutableMap.builder();
+      for (Distribution distribution : distributionList) {
+        m.put(distribution.columnOrdinals(), distribution);
+      }
+      distributionMap = m.build();
+
+      final ImmutableList.Builder<Distribution> b = ImmutableList.builder();
+      for (int i = 0; i < columns.size(); i++) {
+        b.add(distributionMap.get(ImmutableBitSet.of(i)));
+      }
+      singletonDistributionList = b.build();
+    }
+
+    public List<Statistic> statistics() {
+      return ImmutableList.<Statistic>builder()
+          .add(rowCount)
+          .addAll(functionalDependencyList)
+          .addAll(distributionList)
+          .addAll(uniqueList)
+          .build();
+    }
+
+    public double cardinality(ImmutableBitSet columnOrdinals) {
+      final ImmutableBitSet originalOrdinals = columnOrdinals;
+      for (;;) {
+        final Distribution distribution = distributionMap.get(columnOrdinals);
+        if (distribution != null) {
+          if (columnOrdinals == originalOrdinals) {
+            return distribution.cardinality;
+          } else {
+            final List<Double> cardinalityList = new ArrayList<>();
+            cardinalityList.add(distribution.cardinality);
+            for (int ordinal : originalOrdinals.except(columnOrdinals)) {
+              final Distribution d = singletonDistributionList.get(ordinal);
+              cardinalityList.add(d.cardinality);
+            }
+            return Lattice.getRowCount(rowCount.rowCount, cardinalityList);
+          }
+        }
+        // Clear the last bit and iterate.
+        // Better would be to combine all of our nearest ancestors.
+        final List<Integer> list = columnOrdinals.asList();
+        columnOrdinals = columnOrdinals.clear(Util.last(list));
+      }
+    }
+  }
+}
+
+// End Profiler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/src/main/java/org/apache/calcite/profile/ProfilerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/profile/ProfilerImpl.java b/core/src/main/java/org/apache/calcite/profile/ProfilerImpl.java
new file mode 100644
index 0000000..139f13d
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/profile/ProfilerImpl.java
@@ -0,0 +1,809 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.profile;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.linq4j.tree.Primitive;
+import org.apache.calcite.materialize.Lattice;
+import org.apache.calcite.prepare.CalcitePrepareImpl;
+import org.apache.calcite.rel.metadata.NullSentinel;
+import org.apache.calcite.runtime.FlatLists;
+import org.apache.calcite.runtime.PredicateImpl;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.PartiallyOrderedSet;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.yahoo.sketches.hll.HllSketch;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.apache.calcite.profile.ProfilerImpl.CompositeCollector.OF;
+
+/**
+ * Implementation of {@link Profiler} that only investigates "interesting"
+ * combinations of columns.
+ */
+public class ProfilerImpl implements Profiler {
+  /** The number of combinations to consider per pass.
+   * The number is determined by memory, but a value of 1,000 is typical.
+   * You need 2KB memory per sketch, and one sketch for each combination. */
+  private final int combinationsPerPass;
+
+  /** The minimum number of combinations considered "interesting". After that,
+   * a combination is only considered "interesting" if its surprise is greater
+   * than the median surprise. */
+  private final int interestingCount;
+
+  /** Whether a successor is considered interesting enough to analyze. */
+  private final Predicate<Pair<Space, Column>> predicate;
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Creates a {@code ProfilerImpl}.
+   *
+   * @param combinationsPerPass Maximum number of columns (or combinations of
+   *   columns) to compute each pass
+   * @param interestingCount Minimum number of combinations considered
+   *   interesting
+   * @param predicate Whether a successor is considered interesting enough to
+   *   analyze
+   */
+  ProfilerImpl(int combinationsPerPass,
+      int interestingCount, Predicate<Pair<Space, Column>> predicate) {
+    Preconditions.checkArgument(combinationsPerPass > 2);
+    Preconditions.checkArgument(interestingCount > 2);
+    this.combinationsPerPass = combinationsPerPass;
+    this.interestingCount = interestingCount;
+    this.predicate = predicate;
+  }
+
+  public Profile profile(Iterable<List<Comparable>> rows,
+      final List<Column> columns, Collection<ImmutableBitSet> initialGroups) {
+    return new Run(columns, initialGroups).profile(rows);
+  }
+
+  /** A run of the profiler. */
+  class Run {
+    private final List<Column> columns;
+    final PartiallyOrderedSet<ImmutableBitSet> keyPoset =
+        new PartiallyOrderedSet<>(
+            PartiallyOrderedSet.BIT_SET_INCLUSION_ORDERING);
+    final Map<ImmutableBitSet, Distribution> distributions = new HashMap<>();
+    /** List of spaces that have one column. */
+    final List<Space> singletonSpaces;
+    /** Combinations of columns that we have computed but whose successors have
+     * not yet been computed. We may add some of those successors to
+     * {@link #spaceQueue}. */
+    final Queue<Space> doneQueue =
+        new PriorityQueue<>(100,
+          new Comparator<Space>() {
+            public int compare(Space s0, Space s1) {
+              // The space with 0 columns is more interesting than
+              // any space with 1 column, and so forth.
+              // For spaces with 2 or more columns we compare "surprise":
+              // how many fewer values did it have than expected?
+              int c = Integer.compare(s0.columns.size(), s1.columns.size());
+              if (c == 0) {
+                c = Double.compare(s0.surprise(), s1.surprise());
+              }
+              return c;
+            }
+          });
+    final SurpriseQueue surprises;
+
+    /** Combinations of columns that we will compute next pass. */
+    final Deque<ImmutableBitSet> spaceQueue = new ArrayDeque<>();
+    final List<Unique> uniques = new ArrayList<>();
+    final List<FunctionalDependency> functionalDependencies = new ArrayList<>();
+    /** Column ordinals that have ever been placed on {@link #spaceQueue}.
+     * Ensures that we do not calculate the same combination more than once,
+     * even though we generate a column set from multiple parents. */
+    final Set<ImmutableBitSet> resultSet = new HashSet<>();
+    final PartiallyOrderedSet<Space> results = new PartiallyOrderedSet<>(
+        new PartiallyOrderedSet.Ordering<Space>() {
+          public boolean lessThan(Space e1, Space e2) {
+            return e2.columnOrdinals.contains(e1.columnOrdinals);
+          }
+        });
+    private final List<ImmutableBitSet> keyOrdinalLists =
+        new ArrayList<>();
+    final Function<Integer, Column> get =
+        new Function<Integer, Column>() {
+          public Column apply(Integer input) {
+            return columns.get(input);
+          }
+        };
+    private int rowCount;
+
+    /**
+     * Creates a Run.
+     *
+     * @param columns List of columns
+     *
+     * @param initialGroups List of combinations of columns that should be
+     *                     profiled early, because they may be interesting
+     */
+    Run(final List<Column> columns, Collection<ImmutableBitSet> initialGroups) {
+      this.columns = ImmutableList.copyOf(columns);
+      for (Ord<Column> column : Ord.zip(columns)) {
+        if (column.e.ordinal != column.i) {
+          throw new IllegalArgumentException();
+        }
+      }
+      this.singletonSpaces =
+          new ArrayList<>(Collections.nCopies(columns.size(), (Space) null));
+      if (combinationsPerPass > Math.pow(2D, columns.size())) {
+        // There are not many columns. We can compute all combinations in the
+        // first pass.
+        for (ImmutableBitSet ordinals
+            : ImmutableBitSet.range(columns.size()).powerSet()) {
+          spaceQueue.add(ordinals);
+        }
+      } else {
+        // We will need to take multiple passes.
+        // Pass 0, just put the empty combination on the queue.
+        // Next pass, we will do its successors, the singleton combinations.
+        spaceQueue.add(ImmutableBitSet.of());
+        spaceQueue.addAll(initialGroups);
+        if (columns.size() < combinationsPerPass) {
+          // There are not very many columns. Compute the singleton
+          // groups in pass 0.
+          for (Column column : columns) {
+            spaceQueue.add(ImmutableBitSet.of(column.ordinal));
+          }
+        }
+      }
+      // The surprise queue must have enough room for all singleton groups
+      // plus all initial groups.
+      surprises = new SurpriseQueue(1 + columns.size() + initialGroups.size(),
+          interestingCount);
+    }
+
+    Profile profile(Iterable<List<Comparable>> rows) {
+      int pass = 0;
+      for (;;) {
+        final List<Space> spaces = nextBatch(pass);
+        if (spaces.isEmpty()) {
+          break;
+        }
+        pass(pass++, spaces, rows);
+      }
+
+      for (Space s : singletonSpaces) {
+        for (ImmutableBitSet dependent : s.dependents) {
+          functionalDependencies.add(
+              new FunctionalDependency(toColumns(dependent),
+                  Iterables.getOnlyElement(s.columns)));
+        }
+      }
+      return new Profile(columns, new RowCount(rowCount),
+          functionalDependencies, distributions.values(), uniques);
+    }
+
+    /** Populates {@code spaces} with the next batch.
+     * Returns an empty list if done. */
+    List<Space> nextBatch(int pass) {
+      final List<Space> spaces = new ArrayList<>();
+    loop:
+      for (;;) {
+        if (spaces.size() >= combinationsPerPass) {
+          // We have enough for the next pass.
+          return spaces;
+        }
+        // First, see if there is a space we did have room for last pass.
+        final ImmutableBitSet ordinals = spaceQueue.poll();
+        if (ordinals != null) {
+          final Space space = new Space(this, ordinals, toColumns(ordinals));
+          spaces.add(space);
+          if (ordinals.cardinality() == 1) {
+            singletonSpaces.set(ordinals.nth(0), space);
+          }
+        } else {
+          // Next, take a space that was done last time, generate its
+          // successors, and add the interesting ones to the space queue.
+          for (;;) {
+            final Space doneSpace = doneQueue.poll();
+            if (doneSpace == null) {
+              // There are no more done spaces. We're done.
+              return spaces;
+            }
+            if (doneSpace.columnOrdinals.cardinality() > 4) {
+              // Do not generate successors for groups with lots of columns,
+              // probably initial groups
+              continue;
+            }
+            for (Column column : columns) {
+              if (!doneSpace.columnOrdinals.get(column.ordinal)) {
+                if (pass == 0
+                    || doneSpace.columnOrdinals.cardinality() == 0
+                    || !containsKey(
+                        doneSpace.columnOrdinals.set(column.ordinal))
+                    && predicate.apply(Pair.of(doneSpace, column))) {
+                  final ImmutableBitSet nextOrdinals =
+                      doneSpace.columnOrdinals.set(column.ordinal);
+                  if (resultSet.add(nextOrdinals)) {
+                    spaceQueue.add(nextOrdinals);
+                  }
+                }
+              }
+            }
+            // We've converted at a space into at least one interesting
+            // successor.
+            if (!spaceQueue.isEmpty()) {
+              continue loop;
+            }
+          }
+        }
+      }
+    }
+
+    private boolean containsKey(ImmutableBitSet ordinals) {
+      for (ImmutableBitSet keyOrdinals : keyOrdinalLists) {
+        if (ordinals.contains(keyOrdinals)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    void pass(int pass, List<Space> spaces, Iterable<List<Comparable>> rows) {
+      if (CalcitePrepareImpl.DEBUG) {
+        System.out.println("pass: " + pass
+            + ", spaces.size: " + spaces.size()
+            + ", distributions.size: " + distributions.size());
+      }
+
+      for (Space space : spaces) {
+        space.collector = Collector.create(space, 1000);
+      }
+
+      int rowCount = 0;
+      for (final List<Comparable> row : rows) {
+        ++rowCount;
+        for (Space space : spaces) {
+          space.collector.add(row);
+        }
+      }
+
+      // Populate unique keys.
+      // If [x, y] is a key,
+      // then [x, y, z] is a non-minimal key (therefore not interesting),
+      // and [x, y] => [a] is a functional dependency but not interesting,
+      // and [x, y, z] is not an interesting distribution.
+      for (Space space : spaces) {
+        space.collector.finish();
+        space.collector = null;
+//        results.add(space);
+
+        int nonMinimal = 0;
+      dependents:
+        for (Space s : results.getDescendants(space)) {
+          if (s.cardinality == space.cardinality) {
+            // We have discovered a sub-set that has the same cardinality.
+            // The column(s) that are not in common are functionally
+            // dependent.
+            final ImmutableBitSet dependents =
+                space.columnOrdinals.except(s.columnOrdinals);
+            for (int i : s.columnOrdinals) {
+              final Space s1 = singletonSpaces.get(i);
+              final ImmutableBitSet rest = s.columnOrdinals.clear(i);
+              for (ImmutableBitSet dependent : s1.dependents) {
+                if (rest.contains(dependent)) {
+                  // The "key" of this functional dependency is not minimal.
+                  // For instance, if we know that
+                  //   (a) -> x
+                  // then
+                  //   (a, b, x) -> y
+                  // is not minimal; we could say the same with a smaller key:
+                  //   (a, b) -> y
+                  ++nonMinimal;
+                  continue dependents;
+                }
+              }
+            }
+            for (int dependent : dependents) {
+              final Space s1 = singletonSpaces.get(dependent);
+              for (ImmutableBitSet d : s1.dependents) {
+                if (s.columnOrdinals.contains(d)) {
+                  ++nonMinimal;
+                  continue dependents;
+                }
+              }
+            }
+            space.dependencies.or(dependents.toBitSet());
+            for (int d : dependents) {
+              singletonSpaces.get(d).dependents.add(s.columnOrdinals);
+            }
+          }
+        }
+        if (nonMinimal > 0) {
+          continue;
+        }
+        final String s = space.columns.toString(); // for debug
+        Util.discard(s);
+        double expectedCardinality =
+            expectedCardinality(rowCount, space.columnOrdinals);
+
+        final boolean minimal = nonMinimal == 0
+            && !space.unique
+            && !containsKey(space.columnOrdinals);
+        space.expectedCardinality = expectedCardinality;
+        if (minimal) {
+          final Distribution distribution =
+              new Distribution(space.columns, space.valueSet, space.cardinality,
+                  space.nullCount, expectedCardinality, minimal);
+          final double surprise = distribution.surprise();
+          if (CalcitePrepareImpl.DEBUG && surprise > 0.1d) {
+            System.out.println(distribution.columnOrdinals()
+                + " " + distribution.columns
+                + ", cardinality: " + distribution.cardinality
+                + ", expected: " + distribution.expectedCardinality
+                + ", surprise: " + distribution.surprise());
+          }
+          if (surprises.offer(surprise)) {
+            distributions.put(space.columnOrdinals, distribution);
+            keyPoset.add(space.columnOrdinals);
+            doneQueue.add(space);
+          }
+        }
+
+        if (space.cardinality == rowCount) {
+          // We have discovered a new key. It is not a super-set of a key.
+          uniques.add(new Unique(space.columns));
+          keyOrdinalLists.add(space.columnOrdinals);
+          space.unique = true;
+        }
+      }
+
+      if (pass == 0) {
+        this.rowCount = rowCount;
+      }
+    }
+
+    /** Estimates the cardinality of a collection of columns represented by
+     * {@code columnOrdinals}, drawing on existing distributions. */
+    private double cardinality(double rowCount, ImmutableBitSet columns) {
+      final Distribution distribution = distributions.get(columns);
+      if (distribution != null) {
+        return distribution.cardinality;
+      } else {
+        return expectedCardinality(rowCount, columns);
+      }
+    }
+
+    /** Estimates the cardinality of a collection of columns represented by
+     * {@code columnOrdinals}, drawing on existing distributions. Does not
+     * look in the distribution map for this column set. */
+    private double expectedCardinality(double rowCount,
+        ImmutableBitSet columns) {
+      switch (columns.cardinality()) {
+      case 0:
+        return 1d;
+      case 1:
+        return rowCount;
+      default:
+        double c = rowCount;
+        for (ImmutableBitSet bitSet : keyPoset.getParents(columns, true)) {
+          if (bitSet.isEmpty()) {
+            // If the parent is the empty group (i.e. "GROUP BY ()", the grand
+            // total) we cannot improve on the estimate.
+            continue;
+          }
+          final Distribution d1 = distributions.get(bitSet);
+          final double c2 = cardinality(rowCount, columns.except(bitSet));
+          final double d = Lattice.getRowCount(rowCount, d1.cardinality, c2);
+          c = Math.min(c, d);
+        }
+        for (ImmutableBitSet bitSet : keyPoset.getChildren(columns, true)) {
+          final Distribution d1 = distributions.get(bitSet);
+          c = Math.min(c, d1.cardinality);
+        }
+        return c;
+      }
+    }
+
+
+    private ImmutableSortedSet<Column> toColumns(Iterable<Integer> ordinals) {
+      return ImmutableSortedSet.copyOf(Iterables.transform(ordinals, get));
+    }
+  }
+
+  /** Work space for a particular combination of columns. */
+  static class Space {
+    private final Run run;
+    final ImmutableBitSet columnOrdinals;
+    final ImmutableSortedSet<Column> columns;
+    boolean unique;
+    final BitSet dependencies = new BitSet();
+    final Set<ImmutableBitSet> dependents = new HashSet<>();
+    double expectedCardinality;
+    Collector collector;
+    /** Assigned by {@link Collector#finish()}. */
+    int nullCount;
+    /** Number of distinct values. Null is counted as a value, if present.
+     * Assigned by {@link Collector#finish()}. */
+    int cardinality;
+    /** Assigned by {@link Collector#finish()}. */
+    SortedSet<Comparable> valueSet;
+
+    Space(Run run, ImmutableBitSet columnOrdinals, Iterable<Column> columns) {
+      this.run = run;
+      this.columnOrdinals = columnOrdinals;
+      this.columns = ImmutableSortedSet.copyOf(columns);
+    }
+
+    @Override public int hashCode() {
+      return columnOrdinals.hashCode();
+    }
+
+    @Override public boolean equals(Object o) {
+      return o == this
+          || o instanceof Space
+          && columnOrdinals.equals(((Space) o).columnOrdinals);
+    }
+
+    /** Returns the distribution created from this space, or null if no
+     * distribution has been registered yet. */
+    public Distribution distribution() {
+      return run.distributions.get(columnOrdinals);
+    }
+
+    double surprise() {
+      return SimpleProfiler.surprise(expectedCardinality, cardinality);
+    }
+  }
+
+  /** Builds a {@link org.apache.calcite.profile.ProfilerImpl}. */
+  public static class Builder {
+    int combinationsPerPass = 100;
+    Predicate<Pair<Space, Column>> predicate = Predicates.alwaysTrue();
+
+    public ProfilerImpl build() {
+      return new ProfilerImpl(combinationsPerPass, 200, predicate);
+    }
+
+    public Builder withPassSize(int passSize) {
+      this.combinationsPerPass = passSize;
+      return this;
+    }
+
+    public Builder withMinimumSurprise(double v) {
+      predicate =
+          new PredicateImpl<Pair<Space, Column>>() {
+            public boolean test(Pair<Space, Column> spaceColumnPair) {
+              final Space space = spaceColumnPair.left;
+              return false;
+            }
+          };
+      return this;
+    }
+  }
+
+  /** Collects values of a column or columns. */
+  abstract static class Collector {
+    protected final Space space;
+
+    Collector(Space space) {
+      this.space = space;
+    }
+
+    abstract void add(List<Comparable> row);
+    abstract void finish();
+
+    /** Creates an initial collector of the appropriate kind. */
+    public static Collector create(Space space, int sketchThreshold) {
+      final List<Integer> columnOrdinalList = space.columnOrdinals.asList();
+      if (columnOrdinalList.size() == 1) {
+        return new SingletonCollector(space, columnOrdinalList.get(0),
+            sketchThreshold);
+      } else {
+        return new CompositeCollector(space,
+            (int[]) Primitive.INT.toArray(columnOrdinalList), sketchThreshold);
+      }
+    }
+  }
+
+  /** Collector that collects values of a single column. */
+  static class SingletonCollector extends Collector {
+    final SortedSet<Comparable> values = new TreeSet<>();
+    final int columnOrdinal;
+    final int sketchThreshold;
+    int nullCount = 0;
+
+    SingletonCollector(Space space, int columnOrdinal, int sketchThreshold) {
+      super(space);
+      this.columnOrdinal = columnOrdinal;
+      this.sketchThreshold = sketchThreshold;
+    }
+
+    public void add(List<Comparable> row) {
+      final Comparable v = row.get(columnOrdinal);
+      if (v == NullSentinel.INSTANCE) {
+        nullCount++;
+      } else {
+        if (values.add(v) && values.size() == sketchThreshold) {
+          // Too many values. Switch to a sketch collector.
+          final HllSingletonCollector collector =
+              new HllSingletonCollector(space, columnOrdinal);
+          for (Comparable value : values) {
+            collector.add(value);
+          }
+          space.collector = collector;
+        }
+      }
+    }
+
+    public void finish() {
+      space.nullCount = nullCount;
+      space.cardinality = values.size() + (nullCount > 0 ? 1 : 0);
+      space.valueSet = values.size() < 20 ? values : null;
+    }
+  }
+
+  /** Collector that collects two or more column values in a tree set. */
+  static class CompositeCollector extends Collector {
+    protected static final ImmutableBitSet OF = ImmutableBitSet.of(2, 13);
+    final Set<FlatLists.ComparableList> values = new HashSet<>();
+    final int[] columnOrdinals;
+    final Comparable[] columnValues;
+    int nullCount = 0;
+    private final int sketchThreshold;
+
+    CompositeCollector(Space space, int[] columnOrdinals, int sketchThreshold) {
+      super(space);
+      this.columnOrdinals = columnOrdinals;
+      this.columnValues = new Comparable[columnOrdinals.length];
+      this.sketchThreshold = sketchThreshold;
+    }
+
+    public void add(List<Comparable> row) {
+      if (space.columnOrdinals.equals(OF)) {
+        Util.discard(0);
+      }
+      int nullCountThisRow = 0;
+      for (int i = 0, length = columnOrdinals.length; i < length; i++) {
+        final Comparable value = row.get(columnOrdinals[i]);
+        if (value == NullSentinel.INSTANCE) {
+          if (nullCountThisRow++ == 0) {
+            nullCount++;
+          }
+        }
+        columnValues[i] = value;
+      }
+      //noinspection unchecked
+      if (((Set) values).add(FlatLists.copyOf(columnValues))
+          && values.size() == sketchThreshold) {
+        // Too many values. Switch to a sketch collector.
+        final HllCompositeCollector collector =
+            new HllCompositeCollector(space, columnOrdinals);
+        final List<Comparable> list =
+            new ArrayList<>(
+                Collections.nCopies(columnOrdinals[columnOrdinals.length - 1]
+                        + 1,
+                    (Comparable) null));
+        for (FlatLists.ComparableList value : this.values) {
+          for (int i = 0; i < value.size(); i++) {
+            Comparable c = (Comparable) value.get(i);
+            list.set(columnOrdinals[i], c);
+          }
+          collector.add(list);
+        }
+        space.collector = collector;
+      }
+    }
+
+    public void finish() {
+      // number of input rows (not distinct values)
+      // that were null or partially null
+      space.nullCount = nullCount;
+      space.cardinality = values.size() + (nullCount > 0 ? 1 : 0);
+      space.valueSet = null;
+    }
+
+  }
+
+  /** Collector that collects two or more column values into a HyperLogLog
+   * sketch. */
+  abstract static class HllCollector extends Collector {
+    final HllSketch sketch;
+    int nullCount = 0;
+
+    static final long[] NULL_BITS = {0x9f77d57e93167a16L};
+
+    HllCollector(Space space) {
+      super(space);
+      this.sketch = HllSketch.builder().build();
+    }
+
+    protected void add(Comparable value) {
+      if (value == NullSentinel.INSTANCE) {
+        sketch.update(NULL_BITS);
+      } else if (value instanceof String) {
+        sketch.update((String) value);
+      } else if (value instanceof Double) {
+        sketch.update((Double) value);
+      } else if (value instanceof Float) {
+        sketch.update((Float) value);
+      } else if (value instanceof Long) {
+        sketch.update((Long) value);
+      } else if (value instanceof Number) {
+        sketch.update(((Number) value).longValue());
+      } else {
+        sketch.update(value.toString());
+      }
+    }
+
+    public void finish() {
+      space.nullCount = nullCount;
+      space.cardinality = (int) sketch.getEstimate();
+      space.valueSet = null;
+    }
+  }
+
+  /** Collector that collects one column value into a HyperLogLog sketch. */
+  static class HllSingletonCollector extends HllCollector {
+    final int columnOrdinal;
+
+    HllSingletonCollector(Space space, int columnOrdinal) {
+      super(space);
+      this.columnOrdinal = columnOrdinal;
+    }
+
+    public void add(List<Comparable> row) {
+      final Comparable value = row.get(columnOrdinal);
+      if (value == NullSentinel.INSTANCE) {
+        nullCount++;
+        sketch.update(NULL_BITS);
+      } else {
+        add(value);
+      }
+    }
+  }
+
+  /** Collector that collects two or more column values into a HyperLogLog
+   * sketch. */
+  static class HllCompositeCollector extends HllCollector {
+    private final int[] columnOrdinals;
+    private final ByteBuffer buf = ByteBuffer.allocate(1024);
+
+    HllCompositeCollector(Space space, int[] columnOrdinals) {
+      super(space);
+      this.columnOrdinals = columnOrdinals;
+    }
+
+    public void add(List<Comparable> row) {
+      if (space.columnOrdinals.equals(OF)) {
+        Util.discard(0);
+      }
+      int nullCountThisRow = 0;
+      buf.clear();
+      for (int columnOrdinal : columnOrdinals) {
+        final Comparable value = row.get(columnOrdinal);
+        if (value == NullSentinel.INSTANCE) {
+          if (nullCountThisRow++ == 0) {
+            nullCount++;
+          }
+          buf.put((byte) 0);
+        } else if (value instanceof String) {
+          buf.put((byte) 1)
+              .put(((String) value).getBytes(StandardCharsets.UTF_8));
+        } else if (value instanceof Double) {
+          buf.put((byte) 2).putDouble((Double) value);
+        } else if (value instanceof Float) {
+          buf.put((byte) 3).putFloat((Float) value);
+        } else if (value instanceof Long) {
+          buf.put((byte) 4).putLong((Long) value);
+        } else if (value instanceof Integer) {
+          buf.put((byte) 5).putInt((Integer) value);
+        } else if (value instanceof Boolean) {
+          buf.put((Boolean) value ? (byte) 6 : (byte) 7);
+        } else {
+          buf.put((byte) 8)
+              .put(value.toString().getBytes(StandardCharsets.UTF_8));
+        }
+      }
+      sketch.update(Arrays.copyOf(buf.array(), buf.position()));
+    }
+  }
+
+  /** A priority queue of the last N surprise values. Accepts a new value if
+   * the queue is not yet full, or if its value is greater than the median value
+   * over the last N. */
+  static class SurpriseQueue {
+    private final int warmUpCount;
+    private final int size;
+    int count = 0;
+    final Deque<Double> deque = new ArrayDeque<>();
+    final PriorityQueue<Double> priorityQueue =
+        new PriorityQueue<>(11, Ordering.<Double>natural());
+
+    SurpriseQueue(int warmUpCount, int size) {
+      this.warmUpCount = warmUpCount;
+      this.size = size;
+      Preconditions.checkArgument(warmUpCount > 3);
+      Preconditions.checkArgument(size > 0);
+    }
+
+    @Override public String toString() {
+      return "min: " + priorityQueue.peek()
+          + ", contents: " + deque.toString();
+    }
+
+    boolean isValid() {
+      if (CalcitePrepareImpl.DEBUG) {
+        System.out.println(toString());
+      }
+      assert deque.size() == priorityQueue.size();
+      if (count > size) {
+        assert deque.size() == size;
+      }
+      return true;
+    }
+
+    boolean offer(double d) {
+      boolean b;
+      if (count++ < warmUpCount || d > priorityQueue.peek()) {
+        if (priorityQueue.size() >= size) {
+          priorityQueue.remove(deque.pop());
+        }
+        priorityQueue.add(d);
+        deque.add(d);
+        b = true;
+      } else {
+        b = false;
+      }
+      if (CalcitePrepareImpl.DEBUG) {
+        System.out.println("offer " + d
+            + " min " + priorityQueue.peek()
+            + " accepted " + b);
+      }
+      return b;
+    }
+  }
+}
+
+// End ProfilerImpl.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/src/main/java/org/apache/calcite/profile/SimpleProfiler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/profile/SimpleProfiler.java b/core/src/main/java/org/apache/calcite/profile/SimpleProfiler.java
new file mode 100644
index 0000000..c9b00d0
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/profile/SimpleProfiler.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.profile;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.materialize.Lattice;
+import org.apache.calcite.rel.metadata.NullSentinel;
+import org.apache.calcite.runtime.FlatLists;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.PartiallyOrderedSet;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import javax.annotation.Nonnull;
+
+/**
+ * Basic implementation of {@link Profiler}.
+ */
+public class SimpleProfiler implements Profiler {
+  private static final Function<List<Comparable>, Comparable> ONLY =
+      new Function<List<Comparable>, Comparable>() {
+        public Comparable apply(List<Comparable> input) {
+          return Iterables.getOnlyElement(input);
+        }
+      };
+
+  public Profile profile(Iterable<List<Comparable>> rows,
+      final List<Column> columns, Collection<ImmutableBitSet> initialGroups) {
+    Util.discard(initialGroups); // this profiler ignores initial groups
+    return new Run(columns).profile(rows);
+  }
+
+  /** Returns a measure of how much an actual value differs from expected.
+   * The formula is {@code abs(expected - actual) / (expected + actual)}.
+   *
+   * <p>Examples:<ul>
+   *   <li>surprise(e, a) is always between 0 and 1;
+   *   <li>surprise(e, a) is 0 if e = a;
+   *   <li>surprise(e, 0) is 1 if e &gt; 0;
+   *   <li>surprise(0, a) is 1 if a &gt; 0;
+   *   <li>surprise(5, 0) is 100%;
+   *   <li>surprise(5, 3) is 25%;
+   *   <li>surprise(5, 4) is 11%;
+   *   <li>surprise(5, 5) is 0%;
+   *   <li>surprise(5, 6) is 9%;
+   *   <li>surprise(5, 16) is 52%;
+   *   <li>surprise(5, 100) is 90%;
+   * </ul>
+   *
+   * @param expected Expected value
+   * @param actual Actual value
+   * @return Measure of how much expected deviates from actual
+   */
+  public static double surprise(double expected, double actual) {
+    if (expected == actual) {
+      return 0d;
+    }
+    final double sum = expected + actual;
+    if (sum <= 0d) {
+      return 1d;
+    }
+    return Math.abs(expected - actual) / sum;
+  }
+
+  /** A run of the profiler. */
+  static class Run {
+    private final List<Column> columns;
+    final List<Space> spaces = new ArrayList<>();
+    final List<Space> singletonSpaces;
+    final List<Statistic> statistics = new ArrayList<>();
+    final PartiallyOrderedSet.Ordering<Space> ordering =
+        new PartiallyOrderedSet.Ordering<Space>() {
+          public boolean lessThan(Space e1, Space e2) {
+            return e2.columnOrdinals.contains(e1.columnOrdinals);
+          }
+        };
+    final PartiallyOrderedSet<Space> results =
+        new PartiallyOrderedSet<>(ordering);
+    final PartiallyOrderedSet<Space> keyResults =
+        new PartiallyOrderedSet<>(ordering);
+    private final List<ImmutableBitSet> keyOrdinalLists =
+        new ArrayList<>();
+    final Function<Integer, Column> get =
+        new Function<Integer, Column>() {
+          public Column apply(Integer input) {
+            return columns.get(input);
+          }
+        };
+
+    Run(final List<Column> columns) {
+      for (Ord<Column> column : Ord.zip(columns)) {
+        if (column.e.ordinal != column.i) {
+          throw new IllegalArgumentException();
+        }
+      }
+      this.columns = columns;
+      this.singletonSpaces =
+          new ArrayList<>(Collections.nCopies(columns.size(), (Space) null));
+      for (ImmutableBitSet ordinals
+          : ImmutableBitSet.range(columns.size()).powerSet()) {
+        final Space space = new Space(ordinals, toColumns(ordinals));
+        spaces.add(space);
+        if (ordinals.cardinality() == 1) {
+          singletonSpaces.set(ordinals.nth(0), space);
+        }
+      }
+    }
+
+    Profile profile(Iterable<List<Comparable>> rows) {
+      final List<Comparable> values = new ArrayList<>();
+      int rowCount = 0;
+      for (final List<Comparable> row : rows) {
+        ++rowCount;
+      joint:
+        for (Space space : spaces) {
+          values.clear();
+          for (Column column : space.columns) {
+            final Comparable value = row.get(column.ordinal);
+            values.add(value);
+            if (value == NullSentinel.INSTANCE) {
+              space.nullCount++;
+              continue joint;
+            }
+          }
+          space.values.add(FlatLists.ofComparable(values));
+        }
+      }
+
+      // Populate unique keys
+      // If [x, y] is a key,
+      // then [x, y, z] is a key but not intersecting,
+      // and [x, y] => [a] is a functional dependency but not interesting,
+      // and [x, y, z] is not an interesting distribution.
+      final Map<ImmutableBitSet, Distribution> distributions = new HashMap<>();
+      for (Space space : spaces) {
+        if (space.values.size() == rowCount
+            && !containsKey(space.columnOrdinals, false)) {
+          // We have discovered a new key.
+          // It is not an existing key or a super-set of a key.
+          statistics.add(new Unique(space.columns));
+          space.unique = true;
+          keyOrdinalLists.add(space.columnOrdinals);
+        }
+
+        int nonMinimal = 0;
+      dependents:
+        for (Space s : results.getDescendants(space)) {
+          if (s.cardinality() == space.cardinality()) {
+            // We have discovered a sub-set that has the same cardinality.
+            // The column(s) that are not in common are functionally
+            // dependent.
+            final ImmutableBitSet dependents =
+                space.columnOrdinals.except(s.columnOrdinals);
+            for (int i : s.columnOrdinals) {
+              final Space s1 = singletonSpaces.get(i);
+              final ImmutableBitSet rest = s.columnOrdinals.clear(i);
+              for (ImmutableBitSet dependent : s1.dependents) {
+                if (rest.contains(dependent)) {
+                  // The "key" of this functional dependency is not minimal.
+                  // For instance, if we know that
+                  //   (a) -> x
+                  // then
+                  //   (a, b, x) -> y
+                  // is not minimal; we could say the same with a smaller key:
+                  //   (a, b) -> y
+                  ++nonMinimal;
+                  continue dependents;
+                }
+              }
+            }
+            for (int dependent : dependents) {
+              final Space s1 = singletonSpaces.get(dependent);
+              for (ImmutableBitSet d : s1.dependents) {
+                if (s.columnOrdinals.contains(d)) {
+                  ++nonMinimal;
+                  continue dependents;
+                }
+              }
+            }
+            space.dependencies.or(dependents.toBitSet());
+            for (int d : dependents) {
+              singletonSpaces.get(d).dependents.add(s.columnOrdinals);
+            }
+          }
+        }
+
+        int nullCount;
+        final SortedSet<Comparable> valueSet;
+        if (space.columns.size() == 1) {
+          nullCount = space.nullCount;
+          valueSet = ImmutableSortedSet.copyOf(
+              Iterables.transform(space.values, ONLY));
+        } else {
+          nullCount = -1;
+          valueSet = null;
+        }
+        double expectedCardinality;
+        final double cardinality = space.cardinality();
+        switch (space.columns.size()) {
+        case 0:
+          expectedCardinality = 1d;
+          break;
+        case 1:
+          expectedCardinality = rowCount;
+          break;
+        default:
+          expectedCardinality = rowCount;
+          for (Column column : space.columns) {
+            final Distribution d1 =
+                distributions.get(ImmutableBitSet.of(column.ordinal));
+            final Distribution d2 =
+                distributions.get(space.columnOrdinals.clear(column.ordinal));
+            final double d =
+                Lattice.getRowCount(rowCount, d1.cardinality, d2.cardinality);
+            expectedCardinality = Math.min(expectedCardinality, d);
+          }
+        }
+        final boolean minimal = nonMinimal == 0
+            && !space.unique
+            && !containsKey(space.columnOrdinals, true);
+        final Distribution distribution =
+            new Distribution(space.columns, valueSet, cardinality, nullCount,
+                expectedCardinality, minimal);
+        statistics.add(distribution);
+        distributions.put(space.columnOrdinals, distribution);
+
+        if (distribution.minimal) {
+          results.add(space);
+        }
+      }
+
+      for (Space s : singletonSpaces) {
+        for (ImmutableBitSet dependent : s.dependents) {
+          if (!containsKey(dependent, false)
+              && !hasNull(dependent)) {
+            statistics.add(
+                new FunctionalDependency(toColumns(dependent),
+                    Iterables.getOnlyElement(s.columns)));
+          }
+        }
+      }
+      return new Profile(columns, new RowCount(rowCount),
+          Iterables.filter(statistics, FunctionalDependency.class),
+          Iterables.filter(statistics, Distribution.class),
+          Iterables.filter(statistics, Unique.class));
+    }
+
+    /** Returns whether a set of column ordinals
+     * matches or contains a unique key.
+     * If {@code strict}, it must contain a unique key. */
+    private boolean containsKey(ImmutableBitSet ordinals, boolean strict) {
+      for (ImmutableBitSet keyOrdinals : keyOrdinalLists) {
+        if (ordinals.contains(keyOrdinals)) {
+          return !(strict && keyOrdinals.equals(ordinals));
+        }
+      }
+      return false;
+    }
+
+    private boolean hasNull(ImmutableBitSet columnOrdinals) {
+      for (Integer columnOrdinal : columnOrdinals) {
+        if (singletonSpaces.get(columnOrdinal).nullCount > 0) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    private ImmutableSortedSet<Column> toColumns(Iterable<Integer> ordinals) {
+      return ImmutableSortedSet.copyOf(Iterables.transform(ordinals, get));
+    }
+  }
+
+  /** Work space for a particular combination of columns. */
+  static class Space implements Comparable<Space> {
+    final ImmutableBitSet columnOrdinals;
+    final ImmutableSortedSet<Column> columns;
+    int nullCount;
+    final SortedSet<FlatLists.ComparableList<Comparable>> values =
+        new TreeSet<>();
+    boolean unique;
+    final BitSet dependencies = new BitSet();
+    final Set<ImmutableBitSet> dependents = new HashSet<>();
+
+    Space(ImmutableBitSet columnOrdinals, Iterable<Column> columns) {
+      this.columnOrdinals = columnOrdinals;
+      this.columns = ImmutableSortedSet.copyOf(columns);
+    }
+
+    @Override public int hashCode() {
+      return columnOrdinals.hashCode();
+    }
+
+    @Override public boolean equals(Object o) {
+      return o == this
+          || o instanceof Space
+          && columnOrdinals.equals(((Space) o).columnOrdinals);
+    }
+
+    public int compareTo(@Nonnull Space o) {
+      return columnOrdinals.equals(o.columnOrdinals) ? 0
+          : columnOrdinals.contains(o.columnOrdinals) ? 1
+              : -1;
+    }
+
+    /** Number of distinct values. Null is counted as a value, if present. */
+    public double cardinality() {
+      return values.size() + (nullCount > 0 ? 1 : 0);
+    }
+  }
+}
+
+// End SimpleProfiler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/dad58186/core/src/main/java/org/apache/calcite/profile/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/profile/package-info.java b/core/src/main/java/org/apache/calcite/profile/package-info.java
new file mode 100644
index 0000000..c30e9e0
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/profile/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities to analyze data sets.
+ */
+@PackageMarker
+package org.apache.calcite.profile;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java