You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/02/13 17:59:06 UTC

[kudu] 01/03: KUDU-2674: [java] Add a Java KuduPartitioner API

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 3db5c2151fb99c9ca834d6651a893610bc6e4ccd
Author: Grant Henke <gr...@apache.org>
AuthorDate: Fri Jan 25 12:43:34 2019 -0600

    KUDU-2674: [java] Add a Java KuduPartitioner API
    
    This patch is a Java port of  the c++ KuduPartitioner API
    introduced in KUDU-1713 (https://gerrit.cloudera.org/#/c/5775/).
    
    The API allows a client to determine which partition a
    row falls into without actually writing that row. This would
    allow Spark and other Java integrations to repartition and
    pre-sort the data before writing to Kudu.
    
    This patch also fixes a bug where calls to
    AsyncKuduClient.locateTable could take much
    longer than the specified timeout. The timeout
    was not propogated to subsequent locateTablet
    call and each locateTablet used the default
    admin operation timeout as a result.
    
    Change-Id: I7a2d47aab5318c0b6d29a8cb2b073c05bc1b6478
    Reviewed-on: http://gerrit.cloudera.org:8080/12275
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    |  34 +++-
 .../org/apache/kudu/client/AsyncKuduSession.java   |   2 +
 .../org/apache/kudu/client/KuduPartitioner.java    | 180 +++++++++++++++++
 .../apache/kudu/client/TestKuduPartitioner.java    | 214 +++++++++++++++++++++
 src/kudu/client/client-test.cc                     |   6 +-
 src/kudu/client/client.h                           |   2 +-
 6 files changed, 430 insertions(+), 8 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 2ec8719..494cb4d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1821,7 +1821,12 @@ public class AsyncKuduClient implements AutoCloseable {
       // When lookup completes, the tablet (or non-covered range) for the next
       // partition key will be located and added to the client's cache.
       final byte[] lookupKey = partitionKey;
-      return locateTablet(table, key, fetchBatchSize, null).addCallbackDeferring(
+
+      // Build a fake RPC to encapsulate and propagate the timeout. There's no actual "RPC" to send.
+      KuduRpc fakeRpc = buildFakeRpc("loopLocateTable", null);
+      fakeRpc.setTimeoutMillis(deadlineTracker.getMillisBeforeDeadline());
+
+      return locateTablet(table, key, fetchBatchSize, fakeRpc).addCallbackDeferring(
           new Callback<Deferred<List<LocatedTablet>>, GetTableLocationsResponsePB>() {
             @Override
             public Deferred<List<LocatedTablet>> call(GetTableLocationsResponsePB resp) {
@@ -2143,15 +2148,27 @@ public class AsyncKuduClient implements AutoCloseable {
     return cache.get(partitionKey);
   }
 
+  enum LookupType {
+    // The lookup should only return a tablet which actually covers the
+    // requested partition key.
+    POINT,
+    // The lookup should return the next tablet after the requested
+    // partition key if the requested key does not fall within a covered
+    // range.
+    LOWER_BOUND
+  }
+
   /**
    * Returns a deferred containing the located tablet which covers the partition key in the table.
    * @param table the table
    * @param partitionKey the partition key of the tablet to look up in the table
+   * @param lookupType the type of lookup to use
    * @param deadline deadline in milliseconds for this lookup to finish
    * @return a deferred containing the located tablet
    */
   Deferred<LocatedTablet> getTabletLocation(final KuduTable table,
                                             final byte[] partitionKey,
+                                            final LookupType lookupType,
                                             long deadline) {
 
     // Locate the tablet at the partition key by locating tablets between
@@ -2167,6 +2184,8 @@ public class AsyncKuduClient implements AutoCloseable {
       endPartitionKey = Arrays.copyOf(partitionKey, partitionKey.length + 1);
     }
 
+    final DeadlineTracker deadlineTracker = new DeadlineTracker();
+    deadlineTracker.setDeadline(deadline);
     Deferred<List<LocatedTablet>> locatedTablets = locateTable(
         table, startPartitionKey, endPartitionKey, FETCH_TABLETS_PER_POINT_LOOKUP, deadline);
 
@@ -2191,9 +2210,16 @@ public class AsyncKuduClient implements AutoCloseable {
                     "Table location expired before it could be processed")));
               }
               if (entry.isNonCoveredRange()) {
-                return Deferred.fromError(
-                    new NonCoveredRangeException(entry.getLowerBoundPartitionKey(),
-                                                 entry.getUpperBoundPartitionKey()));
+                if (lookupType == LookupType.POINT
+                    || entry.getUpperBoundPartitionKey().length == 0) {
+                  return Deferred.fromError(
+                      new NonCoveredRangeException(entry.getLowerBoundPartitionKey(),
+                          entry.getUpperBoundPartitionKey()));
+                }
+                // This is a LOWER_BOUND lookup, get the tablet location from the upper bound key
+                // of the non-covered range to return the next valid tablet location.
+                return getTabletLocation(table, entry.getUpperBoundPartitionKey(),
+                    LookupType.POINT, deadlineTracker.getMillisBeforeDeadline());
               }
               return Deferred.fromResult(new LocatedTablet(entry.getTablet()));
             }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 7d649bd..d815fdb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -38,6 +38,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Range;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+import org.apache.kudu.client.AsyncKuduClient.LookupType;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.jboss.netty.util.Timeout;
@@ -566,6 +567,7 @@ public class AsyncKuduSession implements SessionConfiguration {
     // Kick off a location lookup.
     Deferred<LocatedTablet> tablet = client.getTabletLocation(operation.getTable(),
                                                               operation.partitionKey(),
+                                                              LookupType.POINT,
                                                               timeoutMs);
 
     // Holds a buffer that should be flushed outside the synchronized block, if necessary.
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPartitioner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPartitioner.java
new file mode 100644
index 0000000..bdadea1
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPartitioner.java
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.google.common.base.Preconditions;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * A KuduPartitioner allows clients to determine the target partition of a
+ * row without actually performing a write. The set of partitions is eagerly
+ * fetched when the KuduPartitioner is constructed so that the actual partitioning
+ * step can be performed synchronously without any network trips.
+ *
+ * NOTE: Because this operates on a metadata snapshot retrieved at construction
+ * time, it will not reflect any metadata changes to the table that have occurred
+ * since its creation.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduPartitioner {
+  private static final BytesKey EMPTY = new BytesKey(new byte[0]);
+  private static final int NON_COVERED_RANGE_INDEX = -1;
+
+  private final PartitionSchema partitionSchema;
+  private final NavigableMap<BytesKey, Integer> partitionByStartKey;
+  private final int numPartitions;
+
+  KuduPartitioner(PartitionSchema partitionSchema,
+                  NavigableMap<BytesKey, Integer> partitionByStartKey,
+                  int numPartitions) {
+    this.partitionSchema = partitionSchema;
+    this.partitionByStartKey = partitionByStartKey;
+    this.numPartitions = numPartitions;
+  }
+
+  /**
+   * @return the number of partitions known by this partitioner.
+   */
+  public int numPartitions() {
+    return this.numPartitions;
+  }
+
+  /**
+   * Determine the partition index that the given row falls into.
+   *
+   * NOTE: The row must be constructed with a schema returned from the Kudu server.
+   * ex: `KuduTable.getSchema().newPartialRow();`
+   *
+   * @param row The row to be partitioned.
+   * @return The resulting partition index.
+   *         The result will be less than numPartitions()
+   * @throws NonCoveredRangeException if the row falls into a non-covered range.
+   */
+  public int partitionRow(PartialRow row) throws NonCoveredRangeException {
+    // Column IDs are required to encode the partition key.
+    Preconditions.checkArgument(row.getSchema().hasColumnIds(),
+        "The row must be constructed with a schema returned from the server. " +
+            "(ex: KuduTable.getSchema().newPartialRow();");
+    BytesKey partitionKey = new BytesKey(partitionSchema.encodePartitionKey(row));
+    // The greatest key that is less than or equal to the given key.
+    Map.Entry<BytesKey, Integer> floor = partitionByStartKey.floorEntry(partitionKey);
+    if (floor.getValue() == NON_COVERED_RANGE_INDEX) {
+      Map.Entry<BytesKey, Integer> ceiling = partitionByStartKey.ceilingEntry(partitionKey);
+      throw new NonCoveredRangeException(floor.getKey().bytes, ceiling.getKey().bytes);
+    }
+    return floor.getValue();
+  }
+
+  /**
+   * A wrapper around a byte array that implements the Comparable interface
+   * allowing it to be used as the key in map.
+   */
+  private static class BytesKey implements Comparable<BytesKey> {
+
+    private final byte[] bytes;
+
+    BytesKey(byte[] bytes) {
+      this.bytes = bytes;
+    }
+
+    public boolean isEmpty() {
+      return bytes.length == 0;
+    }
+
+    @Override
+    public int compareTo(BytesKey other) {
+      return Bytes.memcmp(this.bytes, other.bytes);
+    }
+
+    @Override
+    public String toString() {
+      return Bytes.hex(bytes);
+    }
+  }
+
+  /**
+   * A Builder class to build {@link KuduPartitioner}.
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class KuduPartitionerBuilder {
+
+    private final KuduTable table;
+    private long timeoutMillis;
+
+    public KuduPartitionerBuilder(KuduTable table) {
+      this.table = table;
+      this.timeoutMillis = table.getAsyncClient().getDefaultAdminOperationTimeoutMs();
+    }
+
+    /**
+     * Set the timeout used for building the {@link KuduPartitioner}.
+     * Defaults to the {@link AsyncKuduClient#getDefaultAdminOperationTimeoutMs()}.
+     * @param timeoutMillis the timeout to set in milliseconds.
+     */
+    public KuduPartitionerBuilder buildTimeout(long timeoutMillis) {
+      this.timeoutMillis = timeoutMillis;
+      return this;
+    }
+
+    /**
+     * Builds a {@link KuduPartitioner} using the passed configurations.
+     * @return a new {@link KuduPartitioner}
+     */
+    public KuduPartitioner build() throws KuduException {
+      final DeadlineTracker deadlineTracker = new DeadlineTracker();
+      deadlineTracker.setDeadline(timeoutMillis);
+      NavigableMap<BytesKey, Integer> partitionByStartKey = new TreeMap<>();
+      // Insert a sentinel for the beginning of the table, in case a user
+      // queries for any row which falls before the first partition.
+      partitionByStartKey.put(EMPTY, NON_COVERED_RANGE_INDEX);
+      BytesKey nextPartKey = EMPTY;
+      int numPartitions = 0;
+      while (true) {
+        LocatedTablet tablet;
+        try {
+          tablet = KuduClient.joinAndHandleException(
+              table.getAsyncClient().getTabletLocation(table,
+                  nextPartKey.bytes, AsyncKuduClient.LookupType.LOWER_BOUND,
+                  deadlineTracker.getMillisBeforeDeadline()));
+        } catch (NonCoveredRangeException ncr) {
+          // No more tablets
+          break;
+        }
+        BytesKey keyStart = new BytesKey(tablet.getPartition().partitionKeyStart);
+        BytesKey keyEnd = new BytesKey(tablet.getPartition().partitionKeyEnd);
+        partitionByStartKey.put(keyStart, numPartitions++);
+        if (keyEnd.isEmpty()) break;
+        // Set the start of the next non-covered range to have the NON_COVERED_RANGE_INDEX.
+        // As we process partitions, if a partition covers this range, the keyStart will be
+        // equal to this keyEnd and the NON_COVERED_RANGE_INDEX will be replaced with the index
+        // of that partition.
+        partitionByStartKey.put(keyEnd, NON_COVERED_RANGE_INDEX);
+        nextPartKey = keyEnd;
+      }
+      return new KuduPartitioner(table.getPartitionSchema(), partitionByStartKey, numPartitions);
+    }
+  }
+
+}
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPartitioner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPartitioner.java
new file mode 100644
index 0000000..5fb2ea6
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPartitioner.java
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
+import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
+import static org.apache.kudu.test.ClientTestUtil.getBasicTableOptionsWithNonCoveredRange;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestKuduPartitioner {
+  private static final Logger LOG = LoggerFactory.getLogger(TestKuduPartitioner.class);
+
+  private KuduClient client;
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  @Before
+  public void setUp() {
+    client = harness.getClient();
+  }
+
+  @Test
+  public void testPartitioner() throws Exception {
+    // Create a table with the following 9 partitions:
+    //
+    //             hash bucket
+    //   key     0      1     2
+    //         -----------------
+    //  <3333    x      x     x
+    // 3333-6666 x      x     x
+    //  >=6666   x      x     x
+    Schema basicSchema = getBasicSchema();
+    int numRanges = 3;
+    int numHashPartitions = 3;
+    String tableName = "TestPartitioner";
+
+    List<PartialRow> splitRows = new ArrayList<>();
+    for (int split : Arrays.asList(3333, 6666)) {
+      PartialRow row = basicSchema.newPartialRow();
+      row.addInt("key", split);
+      splitRows.add(row);
+    }
+
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.addHashPartitions(Collections.singletonList("key"), numHashPartitions);
+    createOptions.setRangePartitionColumns(Collections.singletonList("key"));
+    for (PartialRow row : splitRows) {
+      createOptions.addSplitRow(row);
+    }
+
+    KuduTable table = client.createTable(tableName, basicSchema, createOptions);
+    Schema schema = table.getSchema();
+    KuduPartitioner part = new KuduPartitioner.KuduPartitionerBuilder(table).build();
+
+    assertEquals(numRanges * numHashPartitions, part.numPartitions());
+
+    // Partition a bunch of rows, counting how many fall into each partition.
+    int numRowsToPartition = 10000;
+    int[] countsByPartition = new int[part.numPartitions()];
+    Arrays.fill(countsByPartition, 0);
+    for (int i = 0; i < numRowsToPartition; i++) {
+      PartialRow row = schema.newPartialRow();
+      row.addInt("key", i);
+      int partIndex = part.partitionRow(row);
+      countsByPartition[partIndex]++;
+    }
+
+    // We don't expect a completely even division of rows into partitions, but
+    // we should be within 10% of that.
+    int expectedPerPartition = numRowsToPartition / part.numPartitions();
+    int fuzziness = expectedPerPartition / 10;
+    int minPerPartition = expectedPerPartition - fuzziness;
+    int maxPerPartition = expectedPerPartition + fuzziness;
+    for (int i = 0; i < part.numPartitions(); i++) {
+      assertTrue(minPerPartition <= countsByPartition[i]);
+      assertTrue(maxPerPartition >= countsByPartition[i]);
+    }
+
+    // Drop the first and third range partition.
+    AlterTableOptions alterOptions = new AlterTableOptions();
+    alterOptions.dropRangePartition(basicSchema.newPartialRow(), splitRows.get(0));
+    alterOptions.dropRangePartition(splitRows.get(1), basicSchema.newPartialRow());
+    client.alterTable(tableName, alterOptions);
+
+    // The existing partitioner should still return results based on the table
+    // state at the time it was created, and successfully return partitions
+    // for rows in the now-dropped range.
+    assertEquals(numRanges * numHashPartitions, part.numPartitions());
+    PartialRow row = schema.newPartialRow();
+    row.addInt("key", 1000);
+    assertEquals(0, part.partitionRow(row));
+
+    // If we recreate the partitioner, it should get the new partitioning info.
+    part = new KuduPartitioner.KuduPartitionerBuilder(table).build();
+    numRanges = 1;
+    assertEquals(numRanges * numHashPartitions, part.numPartitions());
+  }
+
+  @Test
+  public void testPartitionerNonCoveredRange() throws Exception {
+    Schema basicSchema = getBasicSchema();
+    int numHashPartitions = 3;
+    String tableName = "TestPartitionerNonCoveredRange";
+
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.addHashPartitions(Collections.singletonList("key"), numHashPartitions);
+    createOptions.setRangePartitionColumns(Collections.singletonList("key"));
+    // Cover a range where 1000 <= key < 2000
+    PartialRow lower = basicSchema.newPartialRow();
+    lower.addInt("key", 1000);
+    PartialRow upper = basicSchema.newPartialRow();
+    upper.addInt("key", 2000);
+    createOptions.addRangePartition(lower, upper);
+
+    KuduTable table = client.createTable(tableName, basicSchema, createOptions);
+    Schema schema = table.getSchema();
+    PartitionSchema partitionSchema = table.getPartitionSchema();
+    KuduPartitioner part = new KuduPartitioner.KuduPartitionerBuilder(table).build();
+
+    try {
+      PartialRow under = schema.newPartialRow();
+      under.addInt("key", 999);
+      part.partitionRow(under);
+      fail("partitionRow did not throw a NonCoveredRangeException");
+    } catch (NonCoveredRangeException ex) {
+      // Expected
+    }
+
+    try {
+      PartialRow over = schema.newPartialRow();
+      over.addInt("key", 999);
+      part.partitionRow(over);
+      fail("partitionRow did not throw a NonCoveredRangeException");
+    } catch (NonCoveredRangeException ex) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testBuildTimeout() throws Exception {
+    Schema basicSchema = getBasicSchema();
+    String tableName = "TestBuildTimeout";
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.addHashPartitions(Collections.singletonList("key"), 3);
+    createOptions.setRangePartitionColumns(Collections.singletonList("key"));
+    KuduTable table = client.createTable(tableName, basicSchema, createOptions);
+
+    // Ensure the table information can't be found to force a timeout.
+    harness.killAllMasterServers();
+
+    int timeoutMs = 2000;
+    long now = System.currentTimeMillis();
+    try {
+      KuduPartitioner partitioner = new KuduPartitioner.KuduPartitionerBuilder(table)
+          .buildTimeout(timeoutMs)
+          .build();
+      fail("No NonRecoverableException was thrown");
+    } catch (NonRecoverableException ex) {
+      assertTrue(ex.getMessage().startsWith("cannot complete before timeout"));
+    }
+    long elapsed = System.currentTimeMillis() - now;
+    assertTrue(elapsed <= timeoutMs * 1.1); // Add 10% to avoid flakiness.
+  }
+
+  @Test
+  public void testTableCache() throws Exception {
+    String tableName = "TestTableCache";
+    KuduTable table =
+        client.createTable(tableName, getBasicSchema(), getBasicTableOptionsWithNonCoveredRange());
+
+    // Populate the table cache by building the partitioner once.
+    KuduPartitioner partitioner = new KuduPartitioner.KuduPartitionerBuilder(table).build();
+
+    // Ensure the remote table information can't be found.
+    harness.killAllMasterServers();
+
+    // This partitioner should build correctly because the table cache holds the partitions
+    // from the previous partitioner.
+    KuduPartitioner partitionerFromCache = new KuduPartitioner.KuduPartitionerBuilder(table).build();
+
+    assertEquals(partitioner.numPartitions(), partitionerFromCache.numPartitions());
+  }
+}
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 1979645..d86a76a 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -5541,9 +5541,9 @@ TEST_F(ClientTest, TestPartitioner) {
   //             hash bucket
   //   key     0      1     2
   //         -----------------
-  //  <3000    x      x     x
-  // 3000-7000 x      x     x
-  //  >=7000   x      x     x
+  //  <3333    x      x     x
+  // 3333-6666 x      x     x
+  //  >=6666   x      x     x
   int num_ranges = 3;
   const int kNumHashPartitions = 3;
   const char* kTableName = "TestPartitioner";
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 09103d6..2e2136e 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -2503,7 +2503,7 @@ class KUDU_EXPORT KuduPartitioner {
   ///   The row to be partitioned.
   /// @param [out] partition
   ///   The resulting partition index, or -1 if the row falls into a
-  ///   non-covered range. The result will be less than @c NumPartitioons().
+  ///   non-covered range. The result will be less than @c NumPartitions().
   ///
   /// @return Status::OK if successful. May return a bad Status if the
   ///   provided row does not have all columns of the partition key