You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/04/18 20:12:08 UTC
[1/2] incubator-kudu git commit: KUDU-1312 - design doc for scan
token API
Repository: incubator-kudu
Updated Branches:
refs/heads/master 22160e33c -> d50964eb4
KUDU-1312 - design doc for scan token API
Change-Id: Id208cecababf15e1671a01a219d4599adfcd4163
Reviewed-on: http://gerrit.cloudera.org:8080/2443
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/051e1325
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/051e1325
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/051e1325
Branch: refs/heads/master
Commit: 051e1325a99ae1e871e8a259b14d9080fbea25b4
Parents: 22160e3
Author: Dan Burkert <da...@cloudera.com>
Authored: Thu Mar 3 14:17:38 2016 -0800
Committer: Jean-Daniel Cryans <jd...@gerrit.cloudera.org>
Committed: Mon Apr 18 18:08:12 2016 +0000
----------------------------------------------------------------------
docs/design-docs/README.md | 2 +-
docs/design-docs/scan-tokens.md | 83 ++++++++++++++++++++++++++++++++++++
2 files changed, 84 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/051e1325/docs/design-docs/README.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/README.md b/docs/design-docs/README.md
index 5ecc549..24566e6 100644
--- a/docs/design-docs/README.md
+++ b/docs/design-docs/README.md
@@ -19,7 +19,6 @@ written from a point-of-time view, and do not necessarily represent the current
state of the system. They are useful for learning why design decisions were
made.
-
| Document | Component(s) | Discussion |
| -------- | ------------ | ---------- |
| [Scan optimization and partition pruning](scan-optimization-partition-pruning.md) | Client, Tablet | [gerrit](http://gerrit.cloudera.org:8080/2149) |
@@ -35,3 +34,4 @@ made.
| [Maintenance operation scheduling](triggering-maintenance-ops.md) | Master, Tablet Server | N/A |
| [C++ client design and impl. details](cpp-client.md) | Client | N/A |
| [(old) Heartbeating between tservers and multiple masters](old-multi-master-heartbeating.md) | Master | [gerrit](http://gerrit.cloudera.org:8080/2495) |
+| [Scan Token API](scan-tokens.md) | Client | [gerrit](http://gerrit.cloudera.org:8080/2443) |
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/051e1325/docs/design-docs/scan-tokens.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/scan-tokens.md b/docs/design-docs/scan-tokens.md
new file mode 100644
index 0000000..748b8e6
--- /dev/null
+++ b/docs/design-docs/scan-tokens.md
@@ -0,0 +1,83 @@
+<!---
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Scan Token API
+
+## Motivation
+
+Most distributed compute frameworks that integrate with Kudu need the ability to
+split Kudu tables into physical sections so that computation can be distributed
+and parallelized. Additionally, these frameworks often want to take advantage of
+data locality by executing tasks on or close to the physical machines that hold
+the data they are working on.
+
+Kudu should have a well defined API that all compute integrations can use to
+implement parallel scanning with locality hints. This API should be amenable to
+serialization so that individual scanner tasks may be shipped to remote task
+executors.
+
+## Design
+
+Kudu will provide a client API that takes a scan description (e.g. table name,
+projected columns, fault tolerance, snapshot timestamp, lower and upper primary
+key bounds, predicates, etc.) and returns a sequence of scan tokens. For
+example:
+
+```java
+ScanTokenBuilder builder = client.newScanTokenBuilder();
+builder.setProjectedColumnNames(ImmutableList.of("col1", "col2"));
+List<ScanToken> tokens = builder.build();
+```
+
+Scan tokens may be used to create a scanner over a single tablet. Additionally,
+scan tokens have a well defined, but opaque to the client, serialization format
+so that tokens may be serialized and deserialized by the compute framework, and
+even passed between processes using different Kudu client versions or
+implementations (JVM vs. C++). Continuing the previous example:
+
+```java
+byte[] serializedToken = tokens.get(0).serialize();
+
+// later, possibly in a different process
+
+KuduScanner scanner = ScanToken.deserializeIntoScanner(serializedToken, client);
+```
+
+Along with the serializable scan token, the API will provide a location hint
+containing the replicas hosting the data. This will be done via the existing
+replica location APIs (`org.kududb.client.LocatedTablet` in the Java client, and
+`std::vector<KuduTabletServer*>` in the C++ client).
+
+Initially, the scan token API should support creating a single token per tablet
+in the table (less tablets which may be pruned, if the client supports pruning,
+see the [partition pruning design doc](scan-optimization-partition-pruning.md)).
+Internally, limiting a token to a single tablet should be done by including
+partition key limits in the token, and setting those limits on the scanner when
+deserializing the token. Alternatively, the tablet ID could be directly included
+in the token, but this may have unintended consequences if tablet splitting
+features are added to Kudu. A token could be created before a split event, with
+the resulting scan happening after the split. By setting tablet limits through
+partition key bounds instead of tablet IDs, it is clear that the scanner should
+retrieve results from all of the child tablets.
+
+Eventually, the scan token API should allow applications to further split scan
+tokens so that inter-tablet parallelism can be acheived. Splitting tokens may be
+achieved by assigning the child tokens non-overlapping sections of the primary
+key range. Even without the token splitting feature built in to the API,
+applications can simulate the effect by building multiple sets of scan tokens
+using non-overlapping sets of primary key bounds. However, it is likely that in
+the future Kudu will be able to choose a more optimal primary key split point
+than the application, perhaps through an internal tablet statistics API.
+Additionally, having the API built in to the Kudu client further decreases the
+effort required to write high performance integrations for Kudu.
[2/2] incubator-kudu git commit: [java-client] implement scan token
API
Posted by jd...@apache.org.
[java-client] implement scan token API
This is a first pass at implementing the
[scan token](http://gerrit.cloudera.org:8080/#/c/2443/) API for the Java client.
Scan tokens have been integrated into the Kudu MapReduce input format, but token
splitting has been left to a follow up commit.
Change-Id: I20eff9bf51e893226fc3bc47726565ca62c054e3
Reviewed-on: http://gerrit.cloudera.org:8080/2592
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Jean-Daniel Cryans
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/d50964eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/d50964eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/d50964eb
Branch: refs/heads/master
Commit: d50964eb4492c8d78504313f8b84af8ae501c4a2
Parents: 051e132
Author: Dan Burkert <da...@cloudera.com>
Authored: Mon Mar 7 12:45:56 2016 -0800
Committer: Jean-Daniel Cryans <jd...@gerrit.cloudera.org>
Committed: Mon Apr 18 18:11:34 2016 +0000
----------------------------------------------------------------------
docs/release_notes.adoc | 18 ++
.../client/AbstractKuduScannerBuilder.java | 2 +
.../java/org/kududb/client/AsyncKuduClient.java | 4 +-
.../src/main/java/org/kududb/client/Bytes.java | 2 +-
.../main/java/org/kududb/client/KuduClient.java | 14 +-
.../java/org/kududb/client/KuduPredicate.java | 6 +-
.../java/org/kududb/client/KuduScanToken.java | 315 +++++++++++++++++++
.../java/org/kududb/client/LocatedTablet.java | 4 +-
.../java/org/kududb/client/BaseKuduTest.java | 4 +-
.../java/org/kududb/client/TestKuduClient.java | 79 +++++
.../kududb/mapreduce/KuduTableInputFormat.java | 208 ++++++------
.../mapreduce/KuduTableMapReduceUtil.java | 56 +++-
.../kududb/mapreduce/TestInputFormatJob.java | 20 +-
.../mapreduce/TestKuduTableInputFormat.java | 8 +-
14 files changed, 593 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/docs/release_notes.adoc
----------------------------------------------------------------------
diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc
index e8690e4..017cb6c 100644
--- a/docs/release_notes.adoc
+++ b/docs/release_notes.adoc
@@ -53,6 +53,24 @@ By combining all of these properties, Kudu targets support for families of
applications that are difficult or impossible to implement on current-generation
Hadoop storage technologies.
+[[rn_0.9.0]]
+=== Release notes specific to 0.9.0
+
+[[rn_0.9.0_incompatible_changes]]
+==== Incompatible changes
+
+- The KuduTableInputFormat has changed how it handles scan predicates, including
+ how it serializes predicates to the job configuration object. The new
+ configuration key is "kudu.mapreduce.encoded.predicate". Clients using the
+ TableInputFormatConfigurator should not be affected.
+
+[[rn_0.9.0_new_features]]
+==== New features
+
+- link:https://issues.apache.org/jira/browse/KUDU-1306[KUDU-1306] Scan token API
+ for creating partition-aware scan descriptors. Can be used by clients and
+ query engines to more easily execute parallel scans.
+
[[rn_0.8.0]]
=== Release notes specific to 0.8.0
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java b/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
index ae65aaf..0b1e60b 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
@@ -86,7 +86,9 @@ public abstract class AbstractKuduScannerBuilder
* @param predicateBytes predicates to add
* @return this instance
* @throws IllegalArgumentException thrown when the passed bytes aren't valid
+ * @deprecated use {@link #addPredicate}
*/
+ @Deprecated
public S addColumnRangePredicatesRaw(byte[] predicateBytes) {
for (Tserver.ColumnRangePredicatePB pb : ColumnRangePredicate.fromByteArray(predicateBytes)) {
addPredicate(ColumnRangePredicate.fromPb(pb).toKuduPredicate());
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
index 51a6dd2..d7c5b4d 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
@@ -676,7 +676,7 @@ public class AsyncKuduClient implements AutoCloseable {
* <p>
* Use {@code AsyncUtil.addCallbacksDeferring} to add this as the callback and
* {@link AsyncKuduClient.RetryRpcErrback} as the "errback" to the {@code Deferred}
- * returned by {@link #locateTablet(String, byte[])}.
+ * returned by {@link #locateTablet(KuduTable, byte[])}.
* @param <R> RPC's return type.
* @param <D> Previous query's return type, which we don't use, but need to specify in order to
* tie it all together.
@@ -700,7 +700,7 @@ public class AsyncKuduClient implements AutoCloseable {
* <p>
* Use {@code AsyncUtil.addCallbacksDeferring} to add this as the "errback" and
* {@link RetryRpcCB} as the callback to the {@code Deferred} returned by
- * {@link #locateTablet(String, byte[])}.
+ * {@link #locateTablet(KuduTable, byte[])}.
* @see #delayedSendRpcToTablet(KuduRpc, KuduException)
* @param <R> The type of the original RPC.
*/
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Bytes.java b/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
index 624536e..2c6554d 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Bytes.java
@@ -1080,7 +1080,7 @@ public final class Bytes {
}
/**
- * Utility methd to write a byte array to a data output. Equivalent of doing a writeInt of the
+ * Utility method to write a byte array to a data output. Equivalent of doing a writeInt of the
* length followed by a write of the byte array. Convert back with {@link #readByteArray}
* @param dataOutput
* @param b
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/KuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/KuduClient.java
index a2003b6..6ca0939 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduClient.java
@@ -198,15 +198,25 @@ public class KuduClient implements AutoCloseable {
/**
* Creates a new {@link KuduScanner.KuduScannerBuilder} for a particular table.
- * @param table the name of the table you intend to scan.
+ * @param table the table you intend to scan.
* The string is assumed to use the platform's default charset.
- * @return a new scanner builder for this table
+ * @return a new scanner builder for the table
*/
public KuduScanner.KuduScannerBuilder newScannerBuilder(KuduTable table) {
return new KuduScanner.KuduScannerBuilder(asyncClient, table);
}
/**
+ * Creates a new {@link KuduScanToken.KuduScanTokenBuilder} for a particular table.
+ * Used for integrations with compute frameworks.
+ * @param table the table you intend to scan
+ * @return a new scan token builder for the table
+ */
+ public KuduScanToken.KuduScanTokenBuilder newScanTokenBuilder(KuduTable table) {
+ return new KuduScanToken.KuduScanTokenBuilder(asyncClient, table);
+ }
+
+ /**
* Analogous to {@link #shutdown()}.
* @throws Exception if an error happens while closing the connections
*/
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/KuduPredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduPredicate.java b/java/kudu-client/src/main/java/org/kududb/client/KuduPredicate.java
index 32045ba..dea94c4 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduPredicate.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduPredicate.java
@@ -427,7 +427,8 @@ public class KuduPredicate {
* Convert the predicate to the protobuf representation.
* @return the protobuf message for this predicate.
*/
- Common.ColumnPredicatePB toPB() {
+ @InterfaceAudience.Private
+ public Common.ColumnPredicatePB toPB() {
Common.ColumnPredicatePB.Builder builder = Common.ColumnPredicatePB.newBuilder();
builder.setColumn(column.getName());
@@ -462,7 +463,8 @@ public class KuduPredicate {
* Convert a column predicate protobuf message into a predicate.
* @return a predicate
*/
- static KuduPredicate fromPB(Schema schema, Common.ColumnPredicatePB pb) {
+ @InterfaceAudience.Private
+ public static KuduPredicate fromPB(Schema schema, Common.ColumnPredicatePB pb) {
ColumnSchema column = schema.getColumn(pb.getColumn());
switch (pb.getPredicateCase()) {
case EQUALITY: {
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/KuduScanToken.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/kududb/client/KuduScanToken.java
new file mode 100644
index 0000000..382a4f0
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduScanToken.java
@@ -0,0 +1,315 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.kududb.client;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.ZeroCopyLiteralByteString;
+import org.kududb.ColumnSchema;
+import org.kududb.Common;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.Client.ScanTokenPB;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A scan token describes a partial scan of a Kudu table limited to a single
+ * contiguous physical location. Using the {@link KuduScanTokenBuilder}, clients can
+ * describe the desired scan, including predicates, bounds, timestamps, and
+ * caching, and receive back a collection of scan tokens.
+ *
+ * Each scan token may be separately turned into a scanner using
+ * {@link #intoScanner}, with each scanner responsible for a disjoint section
+ * of the table.
+ *
+ * Scan tokens may be serialized using the {@link #serialize} method and
+ * deserialized back into a scanner using the {@link #deserializeIntoScanner}
+ * method. This allows use cases such as generating scan tokens in the planner
+ * component of a query engine, then sending the tokens to execution nodes based
+ * on locality, and then instantiating the scanners on those nodes.
+ *
+ * Scan token locality information can be inspected using the {@link #getTablet}
+ * method.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class KuduScanToken implements Comparable<KuduScanToken> {
+ private final LocatedTablet tablet;
+ private final ScanTokenPB message;
+
+ private KuduScanToken(LocatedTablet tablet, ScanTokenPB message) {
+ this.tablet = tablet;
+ this.message = message;
+ }
+
+ /**
+ * Returns the tablet which the scanner created from this token will access.
+ * @return the located tablet
+ */
+ public LocatedTablet getTablet() {
+ return tablet;
+ }
+
+ /**
+ * Creates a {@link KuduScanner} from this scan token.
+ * @param client a Kudu client for the cluster
+ * @return a scanner for the scan token
+ */
+ public KuduScanner intoScanner(KuduClient client) throws Exception {
+ return pbIntoScanner(message, client);
+ }
+
+ /**
+ * Serializes this {@code KuduScanToken} into a byte array.
+ * @return the serialized scan token
+ * @throws IOException
+ */
+ public byte[] serialize() throws IOException {
+ byte[] buf = new byte[message.getSerializedSize()];
+ CodedOutputStream cos = CodedOutputStream.newInstance(buf);
+ message.writeTo(cos);
+ cos.flush();
+ return buf;
+ }
+
+ /**
+ * Deserializes a {@code KuduScanToken} into a {@link KuduScanner}.
+ * @param buf a byte array containing the serialized scan token.
+ * @param client a Kudu client for the cluster
+ * @return a scanner for the serialized scan token
+ * @throws Exception
+ */
+ public static KuduScanner deserializeIntoScanner(byte[] buf, KuduClient client) throws Exception {
+ return pbIntoScanner(ScanTokenPB.parseFrom(CodedInputStream.newInstance(buf)), client);
+ }
+
+ private static KuduScanner pbIntoScanner(ScanTokenPB message,
+ KuduClient client) throws Exception {
+ Preconditions.checkArgument(
+ !message.getFeatureFlagsList().contains(ScanTokenPB.Feature.Unknown),
+ "Scan token requires an unsupported feature. This Kudu client must be updated.");
+
+ KuduTable table = client.openTable(message.getTableName());
+ KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
+
+ List<Integer> columns = new ArrayList<>(message.getProjectedColumnsCount());
+ for (Common.ColumnSchemaPB column : message.getProjectedColumnsList()) {
+ int columnIdx = table.getSchema().getColumnIndex(column.getName());
+ ColumnSchema schema = table.getSchema().getColumnByIndex(columnIdx);
+ Preconditions.checkArgument(column.getType() == schema.getType().getDataType(),
+ String.format("Column types do not match for column %s",
+ column.getName()));
+ columns.add(columnIdx);
+ }
+ builder.setProjectedColumnIndexes(columns);
+
+ for (Common.ColumnPredicatePB pred : message.getColumnPredicatesList()) {
+ builder.addPredicate(KuduPredicate.fromPB(table.getSchema(), pred));
+ }
+
+ if (message.hasLowerBoundPrimaryKey()) {
+ builder.lowerBoundRaw(message.getLowerBoundPrimaryKey().toByteArray());
+ }
+ if (message.hasUpperBoundPrimaryKey()) {
+ builder.exclusiveUpperBoundRaw(message.getUpperBoundPrimaryKey().toByteArray());
+ }
+
+ if (message.hasLowerBoundPartitionKey()) {
+ builder.lowerBoundPartitionKeyRaw(message.getLowerBoundPartitionKey().toByteArray());
+ }
+ if (message.hasUpperBoundPartitionKey()) {
+ builder.exclusiveUpperBoundPartitionKeyRaw(message.getUpperBoundPartitionKey().toByteArray());
+ }
+
+ if (message.hasLimit()) {
+ builder.limit(message.getLimit());
+ }
+
+ if (message.hasFaultTolerant()) {
+ // TODO(KUDU-1040)
+ }
+
+ if (message.hasReadMode()) {
+ switch (message.getReadMode()) {
+ case READ_AT_SNAPSHOT: {
+ builder.readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT);
+ if (message.hasSnapTimestamp()) {
+ builder.snapshotTimestampRaw(message.getSnapTimestamp());
+ }
+ break;
+ }
+ case READ_LATEST: {
+ builder.readMode(AsyncKuduScanner.ReadMode.READ_LATEST);
+ break;
+ }
+ default: throw new IllegalArgumentException("unknown read mode");
+ }
+ }
+
+ if (message.hasPropagatedTimestamp()) {
+ // TODO (KUDU-1411)
+ }
+
+ if (message.hasCacheBlocks()) {
+ builder.cacheBlocks(message.getCacheBlocks());
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public int compareTo(KuduScanToken other) {
+ if (!message.getTableName().equals(other.message.getTableName())) {
+ throw new IllegalArgumentException("Scan tokens from different tables may not be compared");
+ }
+
+ return tablet.getPartition().compareTo(other.getTablet().getPartition());
+ }
+
+ /**
+ * Builds a sequence of scan tokens.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public static class KuduScanTokenBuilder
+ extends AbstractKuduScannerBuilder<KuduScanTokenBuilder, List<KuduScanToken>> {
+
+ private long timeout;
+
+ KuduScanTokenBuilder(AsyncKuduClient client, KuduTable table) {
+ super(client, table);
+ timeout = client.getDefaultOperationTimeoutMs();
+ }
+
+ /**
+ * Sets a timeout value to use when building the list of scan tokens. If
+ * unset, the client operation timeout will be used.
+ * @param timeoutMs the timeout in milliseconds.
+ */
+ public KuduScanTokenBuilder setTimeout(long timeoutMs) {
+ timeout = timeoutMs;
+ return this;
+ }
+
+ @Override
+ public List<KuduScanToken> build() {
+ if (lowerBoundPartitionKey != AsyncKuduClient.EMPTY_ARRAY ||
+ upperBoundPartitionKey != AsyncKuduClient.EMPTY_ARRAY) {
+ throw new IllegalArgumentException(
+ "Partition key bounds may not be set on KuduScanTokenBuilder");
+ }
+
+ // If the scan is short-circuitable, then return no tokens.
+ for (KuduPredicate predicate : predicates.values()) {
+ if (predicate.getType() == KuduPredicate.PredicateType.NONE) {
+ return ImmutableList.of();
+ }
+ }
+
+ Client.ScanTokenPB.Builder proto = Client.ScanTokenPB.newBuilder();
+
+ proto.setTableName(table.getName());
+
+ // Map the column names or indices to actual columns in the table schema.
+ // If the user did not set either projection, then scan all columns.
+ if (projectedColumnNames != null) {
+ for (String columnName : projectedColumnNames) {
+ ColumnSchema columnSchema = table.getSchema().getColumn(columnName);
+ Preconditions.checkArgument(columnSchema != null, "unknown column %s", columnName);
+ ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), columnSchema);
+ }
+ } else if (projectedColumnIndexes != null) {
+ for (int columnIdx : projectedColumnIndexes) {
+ ColumnSchema columnSchema = table.getSchema().getColumnByIndex(columnIdx);
+ Preconditions.checkArgument(columnSchema != null, "unknown column index %s", columnIdx);
+ ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), columnSchema);
+ }
+ } else {
+ for (ColumnSchema column : table.getSchema().getColumns()) {
+ ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), column);
+ }
+ }
+
+ for (KuduPredicate predicate : predicates.values()) {
+ proto.addColumnPredicates(predicate.toPB());
+ }
+
+ if (lowerBoundPrimaryKey != AsyncKuduClient.EMPTY_ARRAY && lowerBoundPrimaryKey.length > 0) {
+ proto.setLowerBoundPrimaryKey(ZeroCopyLiteralByteString.copyFrom(lowerBoundPrimaryKey));
+ }
+ if (upperBoundPrimaryKey != AsyncKuduClient.EMPTY_ARRAY && upperBoundPrimaryKey.length > 0) {
+ proto.setUpperBoundPrimaryKey(ZeroCopyLiteralByteString.copyFrom(upperBoundPrimaryKey));
+ }
+ if (lowerBoundPartitionKey != AsyncKuduClient.EMPTY_ARRAY &&
+ lowerBoundPartitionKey.length > 0) {
+ proto.setLowerBoundPartitionKey(ZeroCopyLiteralByteString.copyFrom(lowerBoundPartitionKey));
+ }
+ if (upperBoundPartitionKey != AsyncKuduClient.EMPTY_ARRAY &&
+ upperBoundPartitionKey.length > 0) {
+ proto.setUpperBoundPartitionKey(ZeroCopyLiteralByteString.copyFrom(upperBoundPartitionKey));
+ }
+
+ proto.setLimit(limit);
+ proto.setReadMode(readMode.pbVersion());
+
+ // If the last propagated timestamp is set send it with the scan.
+ if (table.getAsyncClient().getLastPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP) {
+ proto.setPropagatedTimestamp(client.getLastPropagatedTimestamp());
+ }
+
+ // If the mode is set to read on snapshot set the snapshot timestamp.
+ if (readMode == AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT &&
+ htTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+ proto.setSnapTimestamp(htTimestamp);
+ }
+
+ proto.setCacheBlocks(cacheBlocks);
+
+ try {
+ List<LocatedTablet> tablets;
+ if (table.getPartitionSchema().isSimpleRangePartitioning()) {
+ // TODO: replace this with proper partition pruning.
+ tablets = table.getTabletsLocations(
+ lowerBoundPrimaryKey.length == 0 ? null : lowerBoundPrimaryKey,
+ upperBoundPrimaryKey.length == 0 ? null : upperBoundPrimaryKey,
+ timeout);
+ } else {
+ tablets = table.getTabletsLocations(timeout);
+ }
+
+ List<KuduScanToken> tokens = new ArrayList<>(tablets.size());
+ for (LocatedTablet tablet : tablets) {
+ Client.ScanTokenPB.Builder builder = proto.clone();
+ builder.setLowerBoundPartitionKey(
+ ZeroCopyLiteralByteString.wrap(tablet.getPartition().partitionKeyStart));
+ builder.setUpperBoundPartitionKey(
+ ZeroCopyLiteralByteString.wrap(tablet.getPartition().partitionKeyEnd));
+ tokens.add(new KuduScanToken(tablet, builder.build()));
+ }
+ return tokens;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/main/java/org/kududb/client/LocatedTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/LocatedTablet.java b/java/kudu-client/src/main/java/org/kududb/client/LocatedTablet.java
index 28caa77..e25af9b 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/LocatedTablet.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/LocatedTablet.java
@@ -46,11 +46,11 @@ public class LocatedTablet {
this.partition = ProtobufHelper.pbToPartition(pb.getPartition());
this.tabletId = pb.getTabletId().toByteArray();
- List<Replica> reps = Lists.newArrayList();
+ ImmutableList.Builder<Replica> reps = ImmutableList.builder();
for (ReplicaPB repPb : pb.getReplicasList()) {
reps.add(new Replica(repPb));
}
- this.replicas = ImmutableList.copyOf(reps);
+ this.replicas = reps.build();
}
public List<Replica> getReplicas() {
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
index 7014630..996c746 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
@@ -104,7 +104,9 @@ public class BaseKuduTest {
// shutting down the async client effectively does that.
}
} finally {
- miniCluster.shutdown();
+ if (miniCluster != null) {
+ miniCluster.shutdown();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
index 7ff8a18..3893062 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestKuduClient.java
@@ -26,17 +26,24 @@ import static org.kududb.client.KuduPredicate.ComparisonOp.LESS;
import static org.kududb.client.KuduPredicate.ComparisonOp.LESS_EQUAL;
import static org.kududb.client.RowResult.timestampToString;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
import org.kududb.ColumnSchema;
import org.kududb.Schema;
import org.kududb.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestKuduClient extends BaseKuduTest {
+ private static final Logger LOG = LoggerFactory.getLogger(TestKuduClient.class);
private String tableName;
@Before
@@ -309,6 +316,78 @@ public class TestKuduClient extends BaseKuduTest {
}
/**
+ * Tests scan tokens by creating a set of scan tokens, serializing them, and
+ * then executing them in parallel with separate client instances. This
+ * simulates the normal usecase of scan tokens being created at a central
+ * planner and distributed to remote task executors.
+ */
+ @Test
+ public void testScanTokens() throws Exception {
+ Schema schema = createManyStringsSchema();
+ CreateTableOptions createOptions = new CreateTableOptions();
+ createOptions.addHashPartitions(ImmutableList.of("key"), 8);
+
+ PartialRow splitRow = schema.newPartialRow();
+ splitRow.addString("key", "key_50");
+ createOptions.addSplitRow(splitRow);
+
+ syncClient.createTable(tableName, schema, createOptions);
+
+ KuduSession session = syncClient.newSession();
+ session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+ KuduTable table = syncClient.openTable(tableName);
+ for (int i = 0; i < 100; i++) {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addString("key", String.format("key_%02d", i));
+ row.addString("c1", "c1_" + i);
+ row.addString("c2", "c2_" + i);
+ session.apply(insert);
+ }
+ session.flush();
+
+ KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table);
+ tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
+ List<KuduScanToken> tokens = tokenBuilder.build();
+ assertEquals(16, tokens.size());
+
+ final AtomicInteger count = new AtomicInteger(0);
+ List<Thread> threads = new ArrayList<>();
+ for (final KuduScanToken token : tokens) {
+ final byte[] serializedToken = token.serialize();
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try (KuduClient contextClient = new KuduClient.KuduClientBuilder(masterAddresses)
+ .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+ .build()) {
+ KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, contextClient);
+ try {
+ int localCount = 0;
+ while (scanner.hasMoreRows()) {
+ localCount += Iterators.size(scanner.nextRows());
+ }
+ assertTrue(localCount > 0);
+ count.addAndGet(localCount);
+ } finally {
+ scanner.close();
+ }
+ } catch (Exception e) {
+ LOG.error("exception in parallel token scanner", e);
+ }
+ }
+ });
+ thread.run();
+ threads.add(thread);
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ assertEquals(100, count.get());
+ }
+
+ /**
* Creates a local client that we auto-close while buffering one row, then makes sure that after
* closing that we can read the row.
*/
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
index 4b6187c..25235cb 100644
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
+++ b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
@@ -17,28 +17,13 @@ package org.kududb.mapreduce;
import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
-import org.apache.commons.net.util.Base64;
-import org.kududb.Schema;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.net.DNS;
+import com.google.common.primitives.UnsignedBytes;
-import javax.naming.NamingException;
+import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -46,6 +31,35 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.naming.NamingException;
+
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.net.DNS;
+import org.kududb.Common;
+import org.kududb.Schema;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.AsyncKuduClient;
+import org.kududb.client.Bytes;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduPredicate;
+import org.kududb.client.KuduScanner;
+import org.kududb.client.KuduTable;
+import org.kududb.client.LocatedTablet;
+import org.kududb.client.RowResult;
+import org.kududb.client.RowResultIterator;
+import org.kududb.client.KuduScanToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* <p>
@@ -65,9 +79,7 @@ import java.util.Map;
public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
implements Configurable {
- private static final Log LOG = LogFactory.getLog(KuduTableInputFormat.class);
-
- private static final long SLEEP_TIME_FOR_RETRIES_MS = 1000;
+ private static final Logger LOG = LoggerFactory.getLogger(KuduTableInputFormat.class);
/** Job parameter that specifies the input table. */
static final String INPUT_TABLE_KEY = "kudu.mapreduce.input.table";
@@ -84,9 +96,9 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
/** Job parameter that specifies the address for the name server. */
static final String NAME_SERVER_KEY = "kudu.mapreduce.name.server";
- /** Job parameter that specifies the encoded column range predicates (may be empty). */
- static final String ENCODED_COLUMN_RANGE_PREDICATES_KEY =
- "kudu.mapreduce.encoded.column.range.predicates";
+ /** Job parameter that specifies the encoded column predicates (may be empty). */
+ static final String ENCODED_PREDICATES_KEY =
+ "kudu.mapreduce.encoded.predicates";
/**
* Job parameter that specifies the column projection as a comma-separated list of column names.
@@ -102,7 +114,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
* The reverse DNS lookup cache mapping: address from Kudu => hostname for Hadoop. This cache is
* used in order to not do DNS lookups multiple times for each tablet server.
*/
- private final Map<String, String> reverseDNSCacheMap = new HashMap<String, String>();
+ private final Map<String, String> reverseDNSCacheMap = new HashMap<>();
private Configuration conf;
private KuduClient client;
@@ -111,7 +123,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
private String nameServer;
private boolean cacheBlocks;
private List<String> projectedCols;
- private byte[] rawPredicates;
+ private List<KuduPredicate> predicates;
@Override
public List<InputSplit> getSplits(JobContext jobContext)
@@ -120,57 +132,25 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
if (table == null) {
throw new IOException("No table was provided");
}
- List<InputSplit> splits;
- DeadlineTracker deadline = new DeadlineTracker();
- deadline.setDeadline(operationTimeoutMs);
- // If the job is started while a leader election is running, we might not be able to find a
- // leader right away. We'll wait as long as the user is willing to wait with the operation
- // timeout, and once we've waited long enough we just start picking the first replica we see
- // for those tablets that don't have a leader. The client will later try to find the leader
- // and it might fail, in which case the task will get retried.
- retryloop:
- while (true) {
- List<LocatedTablet> locations;
- try {
- locations = table.getTabletsLocations(operationTimeoutMs);
- } catch (Exception e) {
- throw new IOException("Could not get the tablets locations", e);
- }
- if (locations.isEmpty()) {
- throw new IOException("The requested table has 0 tablets, cannot continue");
- }
+ KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table)
+ .setProjectedColumnNames(projectedCols)
+ .cacheBlocks(cacheBlocks)
+ .setTimeout(operationTimeoutMs);
+ for (KuduPredicate predicate : predicates) {
+ tokenBuilder.addPredicate(predicate);
+ }
+ List<KuduScanToken> tokens = tokenBuilder.build();
- // For the moment we only pass the leader since that's who we read from.
- // If we've been trying to get a leader for each tablet for too long, we stop looping
- // and just finish with what we have.
- splits = new ArrayList<InputSplit>(locations.size());
- for (LocatedTablet locatedTablet : locations) {
- List<String> addresses = Lists.newArrayList();
- LocatedTablet.Replica replica = locatedTablet.getLeaderReplica();
- if (replica == null) {
- if (deadline.wouldSleepingTimeout(SLEEP_TIME_FOR_RETRIES_MS)) {
- LOG.debug("We ran out of retries, picking a non-leader replica for this tablet: " +
- locatedTablet.toString());
- // We already checked it's not empty.
- replica = locatedTablet.getReplicas().get(0);
- } else {
- LOG.debug("Retrying creating the splits because this tablet is missing a leader: " +
- locatedTablet.toString());
- Thread.sleep(SLEEP_TIME_FOR_RETRIES_MS);
- continue retryloop;
- }
- }
- addresses.add(reverseDNS(replica.getRpcHost(), replica.getRpcPort()));
- String[] addressesArray = addresses.toArray(new String[addresses.size()]);
- Partition partition = locatedTablet.getPartition();
- TableSplit split = new TableSplit(partition.getPartitionKeyStart(),
- partition.getPartitionKeyEnd(),
- addressesArray);
- splits.add(split);
+ List<InputSplit> splits = new ArrayList<>(tokens.size());
+ for (KuduScanToken token : tokens) {
+ List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
+ for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
+ locations.add(reverseDNS(replica.getRpcHost(), replica.getRpcPort()));
}
- return splits;
+ splits.add(new TableSplit(token, locations.toArray(new String[locations.size()])));
}
+ return splits;
} finally {
shutdownClient();
}
@@ -226,13 +206,13 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
String tableName = conf.get(INPUT_TABLE_KEY);
String masterAddresses = conf.get(MASTER_ADDRESSES_KEY);
this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY,
- AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
+ AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
this.nameServer = conf.get(NAME_SERVER_KEY);
this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false);
this.client = new KuduClient.KuduClientBuilder(masterAddresses)
- .defaultOperationTimeoutMs(operationTimeoutMs)
- .build();
+ .defaultOperationTimeoutMs(operationTimeoutMs)
+ .build();
try {
this.table = client.openTable(tableName);
} catch (Exception ex) {
@@ -259,8 +239,17 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
}
}
- String encodedPredicates = conf.get(ENCODED_COLUMN_RANGE_PREDICATES_KEY, "");
- rawPredicates = Base64.decodeBase64(encodedPredicates);
+ this.predicates = new ArrayList<>();
+ try {
+ InputStream is =
+ new ByteArrayInputStream(Base64.decodeBase64(conf.get(ENCODED_PREDICATES_KEY, "")));
+ while (is.available() > 0) {
+ this.predicates.add(KuduPredicate.fromPB(table.getSchema(),
+ Common.ColumnPredicatePB.parseDelimitedFrom(is)));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("unable to deserialize predicates from the configuration", e);
+ }
}
/**
@@ -284,18 +273,31 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
static class TableSplit extends InputSplit implements Writable, Comparable<TableSplit> {
- private byte[] startPartitionKey;
- private byte[] endPartitionKey;
+ /** The scan token that the split will use to scan the Kudu table. */
+ private byte[] scanToken;
+
+ /** The start partition key of the scan. Used for sorting splits. */
+ private byte[] partitionKey;
+
+ /** Tablet server locations which host the tablet to be scanned. */
private String[] locations;
public TableSplit() { } // Writable
- public TableSplit(byte[] startPartitionKey, byte[] endPartitionKey, String[] locations) {
- this.startPartitionKey = startPartitionKey;
- this.endPartitionKey = endPartitionKey;
+ public TableSplit(KuduScanToken token, String[] locations) throws IOException {
+ this.scanToken = token.serialize();
+ this.partitionKey = token.getTablet().getPartition().getPartitionKeyStart();
this.locations = locations;
}
+ public byte[] getScanToken() {
+ return scanToken;
+ }
+
+ public byte[] getPartitionKey() {
+ return partitionKey;
+ }
+
@Override
public long getLength() throws IOException, InterruptedException {
// TODO Guesstimate a size
@@ -307,23 +309,15 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
return locations;
}
- public byte[] getStartPartitionKey() {
- return startPartitionKey;
- }
-
- public byte[] getEndPartitionKey() {
- return endPartitionKey;
- }
-
@Override
- public int compareTo(TableSplit tableSplit) {
- return Bytes.memcmp(startPartitionKey, tableSplit.getStartPartitionKey());
+ public int compareTo(TableSplit other) {
+ return UnsignedBytes.lexicographicalComparator().compare(partitionKey, other.partitionKey);
}
@Override
public void write(DataOutput dataOutput) throws IOException {
- Bytes.writeByteArray(dataOutput, startPartitionKey);
- Bytes.writeByteArray(dataOutput, endPartitionKey);
+ Bytes.writeByteArray(dataOutput, scanToken);
+ Bytes.writeByteArray(dataOutput, partitionKey);
dataOutput.writeInt(locations.length);
for (String location : locations) {
byte[] str = Bytes.fromString(location);
@@ -333,8 +327,8 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
@Override
public void readFields(DataInput dataInput) throws IOException {
- startPartitionKey = Bytes.readByteArray(dataInput);
- endPartitionKey = Bytes.readByteArray(dataInput);
+ scanToken = Bytes.readByteArray(dataInput);
+ partitionKey = Bytes.readByteArray(dataInput);
locations = new String[dataInput.readInt()];
for (int i = 0; i < locations.length; i++) {
byte[] str = Bytes.readByteArray(dataInput);
@@ -344,8 +338,8 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
@Override
public int hashCode() {
- // We currently just care about the row key since we're within the same table
- return Arrays.hashCode(startPartitionKey);
+ // We currently just care about the partition key since we're within the same table.
+ return Arrays.hashCode(partitionKey);
}
@Override
@@ -361,8 +355,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
@Override
public String toString() {
return Objects.toStringHelper(this)
- .add("startPartitionKey", Bytes.pretty(startPartitionKey))
- .add("endPartitionKey", Bytes.pretty(endPartitionKey))
+ .add("partitionKey", Bytes.pretty(partitionKey))
.add("locations", Arrays.toString(locations))
.toString();
}
@@ -383,13 +376,12 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
}
split = (TableSplit) inputSplit;
- scanner = client.newScannerBuilder(table)
- .setProjectedColumnNames(projectedCols)
- .lowerBoundPartitionKeyRaw(split.getStartPartitionKey())
- .exclusiveUpperBoundPartitionKeyRaw(split.getEndPartitionKey())
- .cacheBlocks(cacheBlocks)
- .addColumnRangePredicatesRaw(rawPredicates)
- .build();
+
+ try {
+ scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), client);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
// Calling this now to set iterator.
tryRefreshIterator();
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
index c8fa5e9..e9d0162 100644
--- a/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
+++ b/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableMapReduceUtil.java
@@ -14,6 +14,20 @@
*/
package org.kududb.mapreduce;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.util.Base64;
@@ -29,16 +43,10 @@ import org.kududb.annotations.InterfaceAudience;
import org.kududb.annotations.InterfaceStability;
import org.kududb.client.AsyncKuduClient;
import org.kududb.client.ColumnRangePredicate;
+import org.kududb.client.KuduPredicate;
import org.kududb.client.KuduTable;
import org.kududb.client.Operation;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.*;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
-
/**
* Utility class to setup MR jobs that use Kudu as an input and/or output.
*/
@@ -145,7 +153,7 @@ public class KuduTableMapReduceUtil {
protected long operationTimeoutMs = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
protected final String columnProjection;
protected boolean cacheBlocks;
- protected List<ColumnRangePredicate> columnRangePredicates = new ArrayList<>();
+ protected List<KuduPredicate> predicates = new ArrayList<>();
/**
* Constructor for the required fields to configure.
@@ -188,10 +196,7 @@ public class KuduTableMapReduceUtil {
conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection);
}
- if (!columnRangePredicates.isEmpty()) {
- conf.set(KuduTableInputFormat.ENCODED_COLUMN_RANGE_PREDICATES_KEY,
- base64EncodePredicates(columnRangePredicates));
- }
+ conf.set(KuduTableInputFormat.ENCODED_PREDICATES_KEY, base64EncodePredicates(predicates));
if (addDependencies) {
addDependencyJars(job);
@@ -199,9 +204,17 @@ public class KuduTableMapReduceUtil {
}
}
- static String base64EncodePredicates(List<ColumnRangePredicate> predicates) {
- byte[] predicateBytes = ColumnRangePredicate.toByteArray(predicates);
- return Base64.encodeBase64String(predicateBytes);
+ /**
+ * Returns the provided predicates as a Base64 encoded string.
+ * @param predicates the predicates to encode
+ * @return the encoded predicates
+ */
+ static String base64EncodePredicates(List<KuduPredicate> predicates) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ for (KuduPredicate predicate : predicates) {
+ predicate.toPB().writeDelimitedTo(baos);
+ }
+ return Base64.encodeBase64String(baos.toByteArray());
}
@@ -297,9 +310,20 @@ public class KuduTableMapReduceUtil {
* Adds a new predicate that will be pushed down to all the tablets.
* @param predicate a predicate to add
* @return this instance
+ * @deprecated use {@link #addPredicate}
*/
+ @Deprecated
public TableInputFormatConfigurator addColumnRangePredicate(ColumnRangePredicate predicate) {
- this.columnRangePredicates.add(predicate);
+ return addPredicate(predicate.toKuduPredicate());
+ }
+
+ /**
+ * Adds a new predicate that will be pushed down to all the tablets.
+ * @param predicate a predicate to add
+ * @return this instance
+ */
+ public TableInputFormatConfigurator addPredicate(KuduPredicate predicate) {
+ this.predicates.add(predicate);
return this;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestInputFormatJob.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestInputFormatJob.java b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestInputFormatJob.java
index 9f416fc..de15645 100644
--- a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestInputFormatJob.java
+++ b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestInputFormatJob.java
@@ -17,7 +17,6 @@
package org.kududb.mapreduce;
import com.google.common.collect.Lists;
-import org.kududb.client.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
@@ -26,6 +25,9 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.KuduPredicate;
+import org.kududb.client.RowResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,19 +71,19 @@ public class TestInputFormatJob extends BaseKuduTest {
Configuration conf = new Configuration();
HADOOP_UTIL.setupAndGetTestDir(TestInputFormatJob.class.getName(), conf).getAbsolutePath();
- createAndTestJob(conf, new ArrayList<ColumnRangePredicate>(), 9);
+ createAndTestJob(conf, new ArrayList<KuduPredicate>(), 9);
- ColumnRangePredicate pred1 = new ColumnRangePredicate(basicSchema.getColumnByIndex(0));
- pred1.setLowerBound(20);
+ KuduPredicate pred1 = KuduPredicate.newComparisonPredicate(
+ basicSchema.getColumnByIndex(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 20);
createAndTestJob(conf, Lists.newArrayList(pred1), 6);
- ColumnRangePredicate pred2 = new ColumnRangePredicate(basicSchema.getColumnByIndex(2));
- pred2.setUpperBound(1);
+ KuduPredicate pred2 = KuduPredicate.newComparisonPredicate(
+ basicSchema.getColumnByIndex(2), KuduPredicate.ComparisonOp.LESS_EQUAL, 1);
createAndTestJob(conf, Lists.newArrayList(pred1, pred2), 2);
}
private void createAndTestJob(Configuration conf,
- List<ColumnRangePredicate> predicates, int expectedCount)
+ List<KuduPredicate> predicates, int expectedCount)
throws Exception {
String jobName = TestInputFormatJob.class.getName();
Job job = new Job(conf, jobName);
@@ -100,8 +102,8 @@ public class TestInputFormatJob extends BaseKuduTest {
.operationTimeoutMs(DEFAULT_SLEEP)
.addDependencies(false)
.cacheBlocks(false);
- for (ColumnRangePredicate predicate : predicates) {
- configurator.addColumnRangePredicate(predicate);
+ for (KuduPredicate predicate : predicates) {
+ configurator.addPredicate(predicate);
}
configurator.configure();
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d50964eb/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestKuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestKuduTableInputFormat.java b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestKuduTableInputFormat.java
index d7942c0..3a13825 100644
--- a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestKuduTableInputFormat.java
+++ b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestKuduTableInputFormat.java
@@ -100,14 +100,14 @@ public class TestKuduTableInputFormat extends BaseKuduTest {
}
// Test using a predicate that filters the row out.
- ColumnRangePredicate pred1 = new ColumnRangePredicate(schema.getColumnByIndex(1));
- pred1.setLowerBound(3);
+ KuduPredicate pred1 = KuduPredicate.newComparisonPredicate(
+ schema.getColumnByIndex(1), KuduPredicate.ComparisonOp.GREATER_EQUAL, 3);
reader = createRecordReader("*", Lists.newArrayList(pred1));
assertFalse(reader.nextKeyValue());
}
private RecordReader<NullWritable, RowResult> createRecordReader(String columnProjection,
- List<ColumnRangePredicate> predicates) throws IOException, InterruptedException {
+ List<KuduPredicate> predicates) throws IOException, InterruptedException {
KuduTableInputFormat input = new KuduTableInputFormat();
Configuration conf = new Configuration();
conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, getMasterAddresses());
@@ -117,7 +117,7 @@ public class TestKuduTableInputFormat extends BaseKuduTest {
}
if (predicates != null) {
String encodedPredicates = KuduTableMapReduceUtil.base64EncodePredicates(predicates);
- conf.set(KuduTableInputFormat.ENCODED_COLUMN_RANGE_PREDICATES_KEY, encodedPredicates);
+ conf.set(KuduTableInputFormat.ENCODED_PREDICATES_KEY, encodedPredicates);
}
input.setConf(conf);
List<InputSplit> splits = input.getSplits(null);