You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/25 17:15:24 UTC
[19/36] incubator-kudu git commit: [java-client] repackage to
org.apache.kudu (Part 1)
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
deleted file mode 100644
index 7699536..0000000
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
+++ /dev/null
@@ -1,894 +0,0 @@
-/*
- * Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * - Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- * - Neither the name of the StumbleUpon nor the names of its contributors
- * may be used to endorse or promote products derived from this software
- * without specific prior written permission.
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-package org.kududb.client;
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.Message;
-import com.google.protobuf.ZeroCopyLiteralByteString;
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.kududb.ColumnSchema;
-import org.kududb.Common;
-import org.kududb.Schema;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.tserver.Tserver;
-import org.kududb.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.kududb.tserver.Tserver.NewScanRequestPB;
-import static org.kududb.tserver.Tserver.ScanRequestPB;
-import static org.kududb.tserver.Tserver.ScanResponsePB;
-import static org.kududb.tserver.Tserver.TabletServerErrorPB;
-
-/**
- * Creates a scanner to read data from Kudu.
- * <p>
- * This class is <strong>not synchronized</strong> as it's expected to be
- * used from a single thread at a time. It's rarely (if ever?) useful to
- * scan concurrently from a shared scanner using multiple threads. If you
- * want to optimize large table scans using extra parallelism, create a few
- * scanners and give each of them a partition of the table to scan. Or use
- * MapReduce.
- * <p>
- * There's no method in this class to explicitly open the scanner. It will open
- * itself automatically when you start scanning by calling {@link #nextRows()}.
- * Also, the scanner will automatically call {@link #close} when it reaches the
- * end key. If, however, you would like to stop scanning <i>before reaching the
- * end key</i>, you <b>must</b> call {@link #close} before disposing of the scanner.
- * Note that it's always safe to call {@link #close} on a scanner.
- * <p>
- * A {@code AsyncKuduScanner} is not re-usable. Should you want to scan the same rows
- * or the same table again, you must create a new one.
- *
- * <h1>A note on passing {@code byte} arrays in argument</h1>
- * None of the method that receive a {@code byte[]} in argument will copy it.
- * For more info, please refer to the documentation of {@link KuduRpc}.
- * <h1>A note on passing {@code String}s in argument</h1>
- * All strings are assumed to use the platform's default charset.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public final class AsyncKuduScanner {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncKuduScanner.class);
-
- /**
- * The possible read modes for scanners.
- */
- @InterfaceAudience.Public
- @InterfaceStability.Evolving
- public enum ReadMode {
- /**
- * When READ_LATEST is specified the server will always return committed writes at
- * the time the request was received. This type of read does not return a snapshot
- * timestamp and is not repeatable.
- *
- * In ACID terms this corresponds to Isolation mode: "Read Committed"
- *
- * This is the default mode.
- */
- READ_LATEST(Common.ReadMode.READ_LATEST),
-
- /**
- * When READ_AT_SNAPSHOT is specified the server will attempt to perform a read
- * at the provided timestamp. If no timestamp is provided the server will take the
- * current time as the snapshot timestamp. In this mode reads are repeatable, i.e.
- * all future reads at the same timestamp will yield the same data. This is
- * performed at the expense of waiting for in-flight transactions whose timestamp
- * is lower than the snapshot's timestamp to complete, so it might incur a latency
- * penalty.
- *
- * In ACID terms this, by itself, corresponds to Isolation mode "Repeatable
- * Read". If all writes to the scanned tablet are made externally consistent,
- * then this corresponds to Isolation mode "Strict-Serializable".
- *
- * Note: there currently "holes", which happen in rare edge conditions, by which writes
- * are sometimes not externally consistent even when action was taken to make them so.
- * In these cases Isolation may degenerate to mode "Read Committed". See KUDU-430.
- */
- READ_AT_SNAPSHOT(Common.ReadMode.READ_AT_SNAPSHOT);
-
- private Common.ReadMode pbVersion;
- ReadMode(Common.ReadMode pbVersion) {
- this.pbVersion = pbVersion;
- }
-
- @InterfaceAudience.Private
- public Common.ReadMode pbVersion() {
- return this.pbVersion;
- }
- }
-
- //////////////////////////
- // Initial configurations.
- //////////////////////////
-
- private final AsyncKuduClient client;
- private final KuduTable table;
- private final Schema schema;
-
- /**
- * Map of column name to predicate.
- */
- private final Map<String, KuduPredicate> predicates;
-
- /**
- * Maximum number of bytes returned by the scanner, on each batch.
- */
- private final int batchSizeBytes;
-
- /**
- * The maximum number of rows to scan.
- */
- private final long limit;
-
- /**
- * The start partition key of the next tablet to scan.
- *
- * Each time the scan exhausts a tablet, this is updated to that tablet's end partition key.
- */
- private byte[] nextPartitionKey;
-
- /**
- * The end partition key of the last tablet to scan.
- */
- private final byte[] endPartitionKey;
-
- /**
- * Set in the builder. If it's not set by the user, it will default to EMPTY_ARRAY.
- * It is then reset to the new start primary key of each tablet we open a scanner on as the scan
- * moves from one tablet to the next.
- */
- private final byte[] startPrimaryKey;
-
- /**
- * Set in the builder. If it's not set by the user, it will default to EMPTY_ARRAY.
- * It's never modified after that.
- */
- private final byte[] endPrimaryKey;
-
- private final boolean prefetching;
-
- private final boolean cacheBlocks;
-
- private final ReadMode readMode;
-
- private final Common.OrderMode orderMode;
-
- private final long htTimestamp;
-
- /////////////////////
- // Runtime variables.
- /////////////////////
-
- private boolean closed = false;
-
- private boolean hasMore = true;
-
- /**
- * The tabletSlice currently being scanned.
- * If null, we haven't started scanning.
- * If == DONE, then we're done scanning.
- * Otherwise it contains a proper tabletSlice name, and we're currently scanning.
- */
- private AsyncKuduClient.RemoteTablet tablet;
-
- /**
- * This is the scanner ID we got from the TabletServer.
- * It's generated randomly so any value is possible.
- */
- private byte[] scannerId;
-
- /**
- * The sequence ID of this call. The sequence ID should start at 0
- * with the request for a new scanner, and after each successful request,
- * the client should increment it by 1. When retrying a request, the client
- * should _not_ increment this value. If the server detects that the client
- * missed a chunk of rows from the middle of a scan, it will respond with an
- * error.
- */
- private int sequenceId;
-
- private Deferred<RowResultIterator> prefetcherDeferred;
-
- private boolean inFirstTablet = true;
-
- final long scanRequestTimeout;
-
- private static final AtomicBoolean PARTITION_PRUNE_WARN = new AtomicBoolean(true);
-
- AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> projectedNames,
- List<Integer> projectedIndexes, ReadMode readMode, Common.OrderMode orderMode,
- long scanRequestTimeout,
- Map<String, KuduPredicate> predicates, long limit,
- boolean cacheBlocks, boolean prefetching,
- byte[] startPrimaryKey, byte[] endPrimaryKey,
- byte[] startPartitionKey, byte[] endPartitionKey,
- long htTimestamp, int batchSizeBytes) {
- checkArgument(batchSizeBytes > 0, "Need a strictly positive number of bytes, " +
- "got %s", batchSizeBytes);
- checkArgument(limit > 0, "Need a strictly positive number for the limit, " +
- "got %s", limit);
- if (htTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
- checkArgument(htTimestamp >= 0, "Need non-negative number for the scan, " +
- " timestamp got %s", htTimestamp);
- checkArgument(readMode == ReadMode.READ_AT_SNAPSHOT, "When specifying a " +
- "HybridClock timestamp, the read mode needs to be set to READ_AT_SNAPSHOT");
- }
- if (orderMode == Common.OrderMode.ORDERED) {
- checkArgument(readMode == ReadMode.READ_AT_SNAPSHOT, "Returning rows in primary key order " +
- "requires the read mode to be set to READ_AT_SNAPSHOT");
- }
-
- this.client = client;
- this.table = table;
- this.readMode = readMode;
- this.orderMode = orderMode;
- this.scanRequestTimeout = scanRequestTimeout;
- this.predicates = predicates;
- this.limit = limit;
- this.cacheBlocks = cacheBlocks;
- this.prefetching = prefetching;
- this.startPrimaryKey = startPrimaryKey;
- this.endPrimaryKey = endPrimaryKey;
- this.htTimestamp = htTimestamp;
- this.batchSizeBytes = batchSizeBytes;
-
- if (!table.getPartitionSchema().isSimpleRangePartitioning() &&
- (startPrimaryKey != AsyncKuduClient.EMPTY_ARRAY ||
- endPrimaryKey != AsyncKuduClient.EMPTY_ARRAY) &&
- PARTITION_PRUNE_WARN.getAndSet(false)) {
- LOG.warn("Starting full table scan. " +
- "In the future this scan may be automatically optimized with partition pruning.");
- }
-
- if (table.getPartitionSchema().isSimpleRangePartitioning()) {
- // If the table is simple range partitioned, then the partition key space
- // is isomorphic to the primary key space. We can potentially reduce the
- // scan length by only scanning the intersection of the primary key range
- // and the partition key range. This is a stop-gap until real partition
- // pruning is in place that can work across any partitioning type.
-
- if ((endPartitionKey.length != 0 && Bytes.memcmp(startPrimaryKey, endPartitionKey) >= 0) ||
- (endPrimaryKey.length != 0 && Bytes.memcmp(startPartitionKey, endPrimaryKey) >= 0)) {
- // The primary key range and the partition key range do not intersect;
- // the scan will be empty.
- this.nextPartitionKey = startPartitionKey;
- this.endPartitionKey = endPartitionKey;
- } else {
- // Assign the scan's partition key range to the intersection of the
- // primary key and partition key ranges.
- if (Bytes.memcmp(startPartitionKey, startPrimaryKey) < 0) {
- this.nextPartitionKey = startPrimaryKey;
- } else {
- this.nextPartitionKey = startPartitionKey;
- }
- if (endPrimaryKey.length != 0 && Bytes.memcmp(endPartitionKey, endPrimaryKey) > 0) {
- this.endPartitionKey = endPrimaryKey;
- } else {
- this.endPartitionKey = endPartitionKey;
- }
- }
- } else {
- this.nextPartitionKey = startPartitionKey;
- this.endPartitionKey = endPartitionKey;
- }
-
- // Map the column names to actual columns in the table schema.
- // If the user set this to 'null', we scan all columns.
- if (projectedNames != null) {
- List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
- for (String columnName : projectedNames) {
- ColumnSchema originalColumn = table.getSchema().getColumn(columnName);
- columns.add(getStrippedColumnSchema(originalColumn));
- }
- this.schema = new Schema(columns);
- } else if (projectedIndexes != null) {
- List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
- for (Integer columnIndex : projectedIndexes) {
- ColumnSchema originalColumn = table.getSchema().getColumnByIndex(columnIndex);
- columns.add(getStrippedColumnSchema(originalColumn));
- }
- this.schema = new Schema(columns);
- } else {
- this.schema = table.getSchema();
- }
-
- // If any of the column predicates are of type None (the predicate is known
- // to match no rows), then the scan can be short circuited without
- // contacting any tablet servers.
- boolean shortCircuit = false;
- for (KuduPredicate predicate : this.predicates.values()) {
- if (predicate.getType() == KuduPredicate.PredicateType.NONE) {
- shortCircuit = true;
- break;
- }
- }
- if (shortCircuit) {
- LOG.debug("Short circuiting scan with predicates: {}", predicates.values());
- this.hasMore = false;
- this.closed = true;
- }
- }
-
- /**
- * Clone the given column schema instance. The new instance will include only the name, type, and
- * nullability of the passed one.
- * @return a new column schema
- */
- private static ColumnSchema getStrippedColumnSchema(ColumnSchema columnToClone) {
- return new ColumnSchema.ColumnSchemaBuilder(columnToClone.getName(), columnToClone.getType())
- .nullable(columnToClone.isNullable())
- .build();
- }
-
- /**
- * Returns the maximum number of rows that this scanner was configured to return.
- * @return a long representing the maximum number of rows that can be returned
- */
- public long getLimit() {
- return this.limit;
- }
-
- /**
- * Tells if the last rpc returned that there might be more rows to scan.
- * @return true if there might be more data to scan, else false
- */
- public boolean hasMoreRows() {
- return this.hasMore;
- }
-
- /**
- * Returns if this scanner was configured to cache data blocks or not.
- * @return true if this scanner will cache blocks, else else.
- */
- public boolean getCacheBlocks() {
- return this.cacheBlocks;
- }
-
- /**
- * Returns the maximum number of bytes returned by the scanner, on each batch.
- * @return a long representing the maximum number of bytes that a scanner can receive at once
- * from a tablet server
- */
- public long getBatchSizeBytes() {
- return this.batchSizeBytes;
- }
-
- /**
- * Returns the ReadMode for this scanner.
- * @return the configured read mode for this scanner
- */
- public ReadMode getReadMode() {
- return this.readMode;
- }
-
- private Common.OrderMode getOrderMode() {
- return this.orderMode;
- }
-
- /**
- * Returns the projection schema of this scanner. If specific columns were
- * not specified during scanner creation, the table schema is returned.
- * @return the projection schema for this scanner
- */
- public Schema getProjectionSchema() {
- return this.schema;
- }
-
- long getSnapshotTimestamp() {
- return this.htTimestamp;
- }
-
- /**
- * Scans a number of rows.
- * <p>
- * Once this method returns {@code null} once (which indicates that this
- * {@code Scanner} is done scanning), calling it again leads to an undefined
- * behavior.
- * @return a deferred list of rows.
- */
- public Deferred<RowResultIterator> nextRows() {
- if (closed) { // We're already done scanning.
- return Deferred.fromResult(null);
- } else if (tablet == null) {
-
- Callback<Deferred<RowResultIterator>, AsyncKuduScanner.Response> cb =
- new Callback<Deferred<RowResultIterator>, Response>() {
- @Override
- public Deferred<RowResultIterator> call(Response resp) throws Exception {
- if (!resp.more || resp.scanner_id == null) {
- scanFinished();
- return Deferred.fromResult(resp.data); // there might be data to return
- }
- scannerId = resp.scanner_id;
- sequenceId++;
- hasMore = resp.more;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Scanner " + Bytes.pretty(scannerId) + " opened on " + tablet);
- }
- return Deferred.fromResult(resp.data);
- }
- public String toString() {
- return "scanner opened";
- }
- };
-
- Callback<Deferred<RowResultIterator>, Exception> eb =
- new Callback<Deferred<RowResultIterator>, Exception>() {
- @Override
- public Deferred<RowResultIterator> call(Exception e) throws Exception {
- invalidate();
- if (e instanceof NonCoveredRangeException) {
- NonCoveredRangeException ncre = (NonCoveredRangeException) e;
- nextPartitionKey = ncre.getNonCoveredRangeEnd();
-
- // Stop scanning if the non-covered range is past the end partition key.
- if (ncre.getNonCoveredRangeEnd().length == 0
- || (endPartitionKey != AsyncKuduClient.EMPTY_ARRAY
- && Bytes.memcmp(endPartitionKey, ncre.getNonCoveredRangeEnd()) <= 0)) {
- hasMore = false;
- closed = true; // the scanner is closed on the other side at this point
- return Deferred.fromResult(RowResultIterator.empty());
- }
- nextPartitionKey = ncre.getNonCoveredRangeEnd();
- scannerId = null;
- sequenceId = 0;
- return nextRows();
- } else {
- LOG.warn("Can not open scanner", e);
- // Don't let the scanner think it's opened on this tablet.
- return Deferred.fromError(e); // Let the error propogate.
- }
- }
- public String toString() {
- return "open scanner errback";
- }
- };
-
- // We need to open the scanner first.
- return client.sendRpcToTablet(getOpenRequest()).addCallbackDeferring(cb).addErrback(eb);
- } else if (prefetching && prefetcherDeferred != null) {
- // TODO KUDU-1260 - Check if this works and add a test
- prefetcherDeferred.chain(new Deferred<RowResultIterator>().addCallback(prefetch));
- return prefetcherDeferred;
- }
- final Deferred<RowResultIterator> d =
- client.scanNextRows(this).addCallbacks(got_next_row, nextRowErrback());
- if (prefetching) {
- d.chain(new Deferred<RowResultIterator>().addCallback(prefetch));
- }
- return d;
- }
-
- private final Callback<RowResultIterator, RowResultIterator> prefetch =
- new Callback<RowResultIterator, RowResultIterator>() {
- @Override
- public RowResultIterator call(RowResultIterator arg) throws Exception {
- if (hasMoreRows()) {
- prefetcherDeferred = client.scanNextRows(AsyncKuduScanner.this).addCallbacks
- (got_next_row, nextRowErrback());
- }
- return null;
- }
- };
-
- /**
- * Singleton callback to handle responses of "next" RPCs.
- * This returns an {@code ArrayList<ArrayList<KeyValue>>} (possibly inside a
- * deferred one).
- */
- private final Callback<RowResultIterator, Response> got_next_row =
- new Callback<RowResultIterator, Response>() {
- public RowResultIterator call(final Response resp) {
- if (!resp.more) { // We're done scanning this tablet.
- scanFinished();
- return resp.data;
- }
- sequenceId++;
- hasMore = resp.more;
- //LOG.info("Scan.next is returning rows: " + resp.data.getNumRows());
- return resp.data;
- }
- public String toString() {
- return "get nextRows response";
- }
- };
-
- /**
- * Creates a new errback to handle errors while trying to get more rows.
- */
- private final Callback<Exception, Exception> nextRowErrback() {
- return new Callback<Exception, Exception>() {
- public Exception call(final Exception error) {
- final AsyncKuduClient.RemoteTablet old_tablet = tablet; // Save before invalidate().
- String message = old_tablet + " pretends to not know " + AsyncKuduScanner.this;
- LOG.warn(message, error);
- invalidate(); // If there was an error, don't assume we're still OK.
- return error; // Let the error propagate.
- }
- public String toString() {
- return "NextRow errback";
- }
- };
- }
-
- void scanFinished() {
- Partition partition = tablet.getPartition();
- // Stop scanning if we have scanned until or past the end partition key.
- if (partition.isEndPartition()
- || (this.endPartitionKey != AsyncKuduClient.EMPTY_ARRAY
- && Bytes.memcmp(this.endPartitionKey, partition.getPartitionKeyEnd()) <= 0)) {
- hasMore = false;
- closed = true; // the scanner is closed on the other side at this point
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Done scanning tablet {} for partition {} with scanner id {}",
- tablet.getTabletIdAsString(), tablet.getPartition(), Bytes.pretty(scannerId));
- }
- nextPartitionKey = partition.getPartitionKeyEnd();
- scannerId = null;
- sequenceId = 0;
- invalidate();
- }
-
- /**
- * Closes this scanner (don't forget to call this when you're done with it!).
- * <p>
- * Closing a scanner already closed has no effect. The deferred returned
- * will be called back immediately.
- * @return A deferred object that indicates the completion of the request.
- * The {@link Object} can be null, a RowResultIterator if there was data left
- * in the scanner, or an Exception.
- */
- public Deferred<RowResultIterator> close() {
- if (closed) {
- return Deferred.fromResult(null);
- }
- final Deferred<RowResultIterator> d =
- client.closeScanner(this).addCallback(closedCallback()); // TODO errBack ?
- return d;
- }
-
- /** Callback+Errback invoked when the TabletServer closed our scanner. */
- private Callback<RowResultIterator, Response> closedCallback() {
- return new Callback<RowResultIterator, Response>() {
- public RowResultIterator call(Response response) {
- closed = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Scanner " + Bytes.pretty(scannerId) + " closed on "
- + tablet);
- }
- tablet = null;
- scannerId = "client debug closed".getBytes(); // Make debugging easier.
- return response == null ? null : response.data;
- }
- public String toString() {
- return "scanner closed";
- }
- };
- }
-
- public String toString() {
- final String tablet = this.tablet == null ? "null" : this.tablet.getTabletIdAsString();
- final StringBuilder buf = new StringBuilder();
- buf.append("KuduScanner(table=");
- buf.append(table.getName());
- buf.append(", tablet=").append(tablet);
- buf.append(", scannerId=").append(Bytes.pretty(scannerId));
- buf.append(", scanRequestTimeout=").append(scanRequestTimeout);
- buf.append(')');
- return buf.toString();
- }
-
- // ---------------------- //
- // Package private stuff. //
- // ---------------------- //
-
- KuduTable table() {
- return table;
- }
-
- /**
- * Sets the name of the tabletSlice that's hosting {@code this.start_key}.
- * @param tablet The tabletSlice we're currently supposed to be scanning.
- */
- void setTablet(final AsyncKuduClient.RemoteTablet tablet) {
- this.tablet = tablet;
- }
-
- /**
- * Invalidates this scanner and makes it assume it's no longer opened.
- * When a TabletServer goes away while we're scanning it, or some other type
- * of access problem happens, this method should be called so that the
- * scanner will have to re-locate the TabletServer and re-open itself.
- */
- void invalidate() {
- tablet = null;
- }
-
- /**
- * Returns the tabletSlice currently being scanned, if any.
- */
- AsyncKuduClient.RemoteTablet currentTablet() {
- return tablet;
- }
-
- /**
- * Returns an RPC to open this scanner.
- */
- KuduRpc<Response> getOpenRequest() {
- checkScanningNotStarted();
- // This is the only point where we know we haven't started scanning and where the scanner
- // should be fully configured
- if (this.inFirstTablet) {
- this.inFirstTablet = false;
- }
- return new ScanRequest(table, State.OPENING);
- }
-
- /**
- * Returns an RPC to fetch the next rows.
- */
- KuduRpc<Response> getNextRowsRequest() {
- return new ScanRequest(table, State.NEXT);
- }
-
- /**
- * Returns an RPC to close this scanner.
- */
- KuduRpc<Response> getCloseRequest() {
- return new ScanRequest(table, State.CLOSING);
- }
-
- /**
- * Throws an exception if scanning already started.
- * @throws IllegalStateException if scanning already started.
- */
- private void checkScanningNotStarted() {
- if (tablet != null) {
- throw new IllegalStateException("scanning already started");
- }
- }
-
- /**
- * Helper object that contains all the info sent by a TS after a Scan request.
- */
- static final class Response {
- /** The ID associated with the scanner that issued the request. */
- private final byte[] scanner_id;
- /** The actual payload of the response. */
- private final RowResultIterator data;
-
- /**
- * If false, the filter we use decided there was no more data to scan.
- * In this case, the server has automatically closed the scanner for us,
- * so we don't need to explicitly close it.
- */
- private final boolean more;
-
- Response(final byte[] scanner_id,
- final RowResultIterator data,
- final boolean more) {
- this.scanner_id = scanner_id;
- this.data = data;
- this.more = more;
- }
-
- public String toString() {
- return "AsyncKuduScanner$Response(scannerId=" + Bytes.pretty(scanner_id)
- + ", data=" + data + ", more=" + more + ") ";
- }
- }
-
- private enum State {
- OPENING,
- NEXT,
- CLOSING
- }
-
- /**
- * RPC sent out to fetch the next rows from the TabletServer.
- */
- private final class ScanRequest extends KuduRpc<Response> {
-
- State state;
-
- ScanRequest(KuduTable table, State state) {
- super(table);
- this.state = state;
- this.setTimeoutMillis(scanRequestTimeout);
- }
-
- @Override
- String serviceName() { return TABLET_SERVER_SERVICE_NAME; }
-
- @Override
- String method() {
- return "Scan";
- }
-
- @Override
- Collection<Integer> getRequiredFeatures() {
- if (predicates.isEmpty()) {
- return ImmutableList.of();
- } else {
- return ImmutableList.of(Tserver.TabletServerFeatures.COLUMN_PREDICATES_VALUE);
- }
- }
-
- /** Serializes this request. */
- ChannelBuffer serialize(Message header) {
- final ScanRequestPB.Builder builder = ScanRequestPB.newBuilder();
- switch (state) {
- case OPENING:
- // Save the tablet in the AsyncKuduScanner. This kind of a kludge but it really
- // is the easiest way.
- AsyncKuduScanner.this.tablet = super.getTablet();
- NewScanRequestPB.Builder newBuilder = NewScanRequestPB.newBuilder();
- newBuilder.setLimit(limit); // currently ignored
- newBuilder.addAllProjectedColumns(ProtobufHelper.schemaToListPb(schema));
- newBuilder.setTabletId(ZeroCopyLiteralByteString.wrap(tablet.getTabletIdAsBytes()));
- newBuilder.setReadMode(AsyncKuduScanner.this.getReadMode().pbVersion());
- newBuilder.setOrderMode(AsyncKuduScanner.this.getOrderMode());
- newBuilder.setCacheBlocks(cacheBlocks);
- // if the last propagated timestamp is set send it with the scan
- if (table.getAsyncClient().getLastPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP) {
- newBuilder.setPropagatedTimestamp(table.getAsyncClient().getLastPropagatedTimestamp());
- }
- newBuilder.setReadMode(AsyncKuduScanner.this.getReadMode().pbVersion());
-
- // if the mode is set to read on snapshot sent the snapshot timestamp
- if (AsyncKuduScanner.this.getReadMode() == ReadMode.READ_AT_SNAPSHOT &&
- AsyncKuduScanner.this.getSnapshotTimestamp() != AsyncKuduClient.NO_TIMESTAMP) {
- newBuilder.setSnapTimestamp(AsyncKuduScanner.this.getSnapshotTimestamp());
- }
-
- if (AsyncKuduScanner.this.startPrimaryKey != AsyncKuduClient.EMPTY_ARRAY &&
- AsyncKuduScanner.this.startPrimaryKey.length > 0) {
- newBuilder.setStartPrimaryKey(ZeroCopyLiteralByteString.copyFrom(startPrimaryKey));
- }
-
- if (AsyncKuduScanner.this.endPrimaryKey != AsyncKuduClient.EMPTY_ARRAY &&
- AsyncKuduScanner.this.endPrimaryKey.length > 0) {
- newBuilder.setStopPrimaryKey(ZeroCopyLiteralByteString.copyFrom(endPrimaryKey));
- }
-
- for (KuduPredicate pred : predicates.values()) {
- newBuilder.addColumnPredicates(pred.toPB());
- }
- builder.setNewScanRequest(newBuilder.build())
- .setBatchSizeBytes(batchSizeBytes);
- break;
- case NEXT:
- setTablet(AsyncKuduScanner.this.tablet);
- builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
- .setCallSeqId(AsyncKuduScanner.this.sequenceId)
- .setBatchSizeBytes(batchSizeBytes);
- break;
- case CLOSING:
- setTablet(AsyncKuduScanner.this.tablet);
- builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
- .setBatchSizeBytes(0)
- .setCloseScanner(true);
- }
-
- ScanRequestPB request = builder.build();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending scan req: " + request.toString());
- }
-
- return toChannelBuffer(header, request);
- }
-
- @Override
- Pair<Response, Object> deserialize(final CallResponse callResponse,
- String tsUUID) throws Exception {
- ScanResponsePB.Builder builder = ScanResponsePB.newBuilder();
- readProtobuf(callResponse.getPBMessage(), builder);
- ScanResponsePB resp = builder.build();
- final byte[] id = resp.getScannerId().toByteArray();
- TabletServerErrorPB error = resp.hasError() ? resp.getError() : null;
-
- if (error != null && error.getCode().equals(TabletServerErrorPB.Code.TABLET_NOT_FOUND)) {
- if (state == State.OPENING) {
- // Doing this will trigger finding the new location.
- return new Pair<Response, Object>(null, error);
- } else {
- Status statusIncomplete = Status.Incomplete("Cannot continue scanning, " +
- "the tablet has moved and this isn't a fault tolerant scan");
- throw new NonRecoverableException(statusIncomplete);
- }
- }
- RowResultIterator iterator = RowResultIterator.makeRowResultIterator(
- deadlineTracker.getElapsedMillis(), tsUUID, schema, resp.getData(),
- callResponse);
-
- boolean hasMore = resp.getHasMoreResults();
- if (id.length != 0 && scannerId != null && !Bytes.equals(scannerId, id)) {
- Status statusIllegalState = Status.IllegalState("Scan RPC response was for scanner"
- + " ID " + Bytes.pretty(id) + " but we expected "
- + Bytes.pretty(scannerId));
- throw new NonRecoverableException(statusIllegalState);
- }
- Response response = new Response(id, iterator, hasMore);
- if (LOG.isDebugEnabled()) {
- LOG.debug(response.toString());
- }
- return new Pair<Response, Object>(response, error);
- }
-
- public String toString() {
- return "ScanRequest(scannerId=" + Bytes.pretty(scannerId)
- + (tablet != null? ", tabletSlice=" + tablet.getTabletIdAsString() : "")
- + ", attempt=" + attempt + ')';
- }
-
- @Override
- public byte[] partitionKey() {
- // This key is used to lookup where the request needs to go
- return nextPartitionKey;
- }
- }
-
- /**
- * A Builder class to build {@link AsyncKuduScanner}.
- * Use {@link AsyncKuduClient#newScannerBuilder} in order to get a builder instance.
- */
- @InterfaceAudience.Public
- @InterfaceStability.Evolving
- public static class AsyncKuduScannerBuilder
- extends AbstractKuduScannerBuilder<AsyncKuduScannerBuilder, AsyncKuduScanner> {
-
- AsyncKuduScannerBuilder(AsyncKuduClient client, KuduTable table) {
- super(client, table);
- }
-
- /**
- * Builds an {@link AsyncKuduScanner} using the passed configurations.
- * @return a new {@link AsyncKuduScanner}
- */
- public AsyncKuduScanner build() {
- return new AsyncKuduScanner(
- client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
- scanRequestTimeout, predicates, limit, cacheBlocks,
- prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
- lowerBoundPartitionKey, upperBoundPartitionKey,
- htTimestamp, batchSizeBytes);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
deleted file mode 100644
index 0290ee7..0000000
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
+++ /dev/null
@@ -1,856 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Range;
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.util.AsyncUtil;
-import org.kududb.util.Slice;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.NotThreadSafe;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.kududb.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
-
-/**
- * A AsyncKuduSession belongs to a specific AsyncKuduClient, and represents a context in
- * which all read/write data access should take place. Within a session,
- * multiple operations may be accumulated and batched together for better
- * efficiency. Settings like timeouts, priorities, and trace IDs are also set
- * per session.<p>
- *
- * AsyncKuduSession is separate from KuduTable because a given batch or transaction
- * may span multiple tables. This is particularly important in the future when
- * we add ACID support, but even in the context of batching, we may be able to
- * coalesce writes to different tables hosted on the same server into the same
- * RPC.<p>
- *
- * AsyncKuduSession is separate from AsyncKuduClient because, in a multi-threaded
- * application, different threads may need to concurrently execute
- * transactions. Similar to a JDBC "session", transaction boundaries will be
- * delineated on a per-session basis -- in between a "BeginTransaction" and
- * "Commit" call on a given session, all operations will be part of the same
- * transaction. Meanwhile another concurrent Session object can safely run
- * non-transactional work or other transactions without interfering.<p>
- *
- * Therefore, this class is <b>not</b> thread-safe.<p>
- *
- * Additionally, there is a guarantee that writes from different sessions do not
- * get batched together into the same RPCs -- this means that latency-sensitive
- * clients can run through the same AsyncKuduClient object as throughput-oriented
- * clients, perhaps by setting the latency-sensitive session's timeouts low and
- * priorities high. Without the separation of batches, a latency-sensitive
- * single-row insert might get batched along with 10MB worth of inserts from the
- * batch writer, thus delaying the response significantly.<p>
- *
- * Though we currently do not have transactional support, users will be forced
- * to use a AsyncKuduSession to instantiate reads as well as writes. This will make
- * it more straight-forward to add RW transactions in the future without
- * significant modifications to the API.<p>
- *
- * Timeouts are handled differently depending on the flush mode.
- * With AUTO_FLUSH_SYNC, the timeout is set on each apply()'d operation.
- * With AUTO_FLUSH_BACKGROUND and MANUAL_FLUSH, the timeout is assigned to a whole batch of
- * operations upon flush()'ing. It means that in a situation with a timeout of 500ms and a flush
- * interval of 1000ms, an operation can be outstanding for up to 1500ms before being timed out.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-@NotThreadSafe
-public class AsyncKuduSession implements SessionConfiguration {
-
- public static final Logger LOG = LoggerFactory.getLogger(AsyncKuduSession.class);
- private static final Range<Float> PERCENTAGE_RANGE = Range.closed(0.0f, 1.0f);
-
- private final AsyncKuduClient client;
- private final Random randomizer = new Random();
- private final ErrorCollector errorCollector;
- private int interval = 1000;
- private int mutationBufferSpace = 1000; // TODO express this in terms of data size.
- private float mutationBufferLowWatermarkPercentage = 0.5f;
- private int mutationBufferLowWatermark;
- private FlushMode flushMode;
- private ExternalConsistencyMode consistencyMode;
- private long timeoutMs;
-
- /**
- * Protects internal state from concurrent access. {@code AsyncKuduSession} is not threadsafe
- * from the application's perspective, but because internally async timers and async flushing
- * tasks may access the session concurrently with the application, synchronization is still
- * needed.
- */
- private final Object monitor = new Object();
-
- /**
- * Tracks the currently active buffer.
- *
- * When in mode {@link FlushMode#AUTO_FLUSH_BACKGROUND} or {@link FlushMode#AUTO_FLUSH_SYNC},
- * {@code AsyncKuduSession} uses double buffering to improve write throughput. While the
- * application is {@link #apply}ing operations to one buffer (the {@code activeBuffer}), the
- * second buffer is either being flushed, or if it has already been flushed, it waits in the
- * {@link #inactiveBuffers} queue. When the currently active buffer is flushed,
- * {@code activeBuffer} is set to {@code null}. On the next call to {@code apply}, an inactive
- * buffer is taken from {@code inactiveBuffers} and made the new active buffer. If both
- * buffers are still flushing, then the {@code apply} call throws {@link PleaseThrottleException}.
- */
- @GuardedBy("monitor")
- private Buffer activeBuffer;
-
- /**
- * The buffers. May either be active (pointed to by {@link #activeBuffer},
- * inactive (in the {@link #inactiveBuffers}) queue, or flushing.
- */
- private final Buffer bufferA = new Buffer();
- private final Buffer bufferB = new Buffer();
-
- /**
- * Queue containing flushed, inactive buffers. May be accessed from callbacks (I/O threads).
- * We restrict the session to only two buffers, so {@link BlockingQueue#add} can
- * be used without chance of failure.
- */
- private final BlockingQueue<Buffer> inactiveBuffers = new ArrayBlockingQueue<>(2, false);
-
- /**
- * Deferred used to notify on flush events. Atomically swapped and completed every time a buffer
- * is flushed. This can be used to notify handlers of {@link PleaseThrottleException} that more
- * capacity may be available in the active buffer.
- */
- private final AtomicReference<Deferred<Void>> flushNotification =
- new AtomicReference<>(new Deferred<Void>());
-
- /**
- * Tracks whether the session has been closed.
- */
- private volatile boolean closed = false;
-
- private boolean ignoreAllDuplicateRows = false;
-
- /**
- * Package-private constructor meant to be used via AsyncKuduClient
- * @param client client that creates this session
- */
- AsyncKuduSession(AsyncKuduClient client) {
- this.client = client;
- flushMode = FlushMode.AUTO_FLUSH_SYNC;
- consistencyMode = CLIENT_PROPAGATED;
- timeoutMs = client.getDefaultOperationTimeoutMs();
- inactiveBuffers.add(bufferA);
- inactiveBuffers.add(bufferB);
- errorCollector = new ErrorCollector(mutationBufferSpace);
- setMutationBufferLowWatermark(this.mutationBufferLowWatermarkPercentage);
- }
-
- @Override
- public FlushMode getFlushMode() {
- return this.flushMode;
- }
-
- @Override
- public void setFlushMode(FlushMode flushMode) {
- if (hasPendingOperations()) {
- throw new IllegalArgumentException("Cannot change flush mode when writes are buffered");
- }
- this.flushMode = flushMode;
- }
-
- @Override
- public void setExternalConsistencyMode(ExternalConsistencyMode consistencyMode) {
- if (hasPendingOperations()) {
- throw new IllegalArgumentException("Cannot change consistency mode "
- + "when writes are buffered");
- }
- this.consistencyMode = consistencyMode;
- }
-
- @Override
- public void setMutationBufferSpace(int size) {
- if (hasPendingOperations()) {
- throw new IllegalArgumentException("Cannot change the buffer" +
- " size when operations are buffered");
- }
- this.mutationBufferSpace = size;
- // Reset the low watermark, using the same percentage as before.
- setMutationBufferLowWatermark(mutationBufferLowWatermarkPercentage);
- }
-
- @Override
- public void setMutationBufferLowWatermark(float mutationBufferLowWatermarkPercentage) {
- if (hasPendingOperations()) {
- throw new IllegalArgumentException("Cannot change the buffer" +
- " low watermark when operations are buffered");
- } else if (!PERCENTAGE_RANGE.contains(mutationBufferLowWatermarkPercentage)) {
- throw new IllegalArgumentException("The low watermark must be between 0 and 1 inclusively");
- }
- this.mutationBufferLowWatermarkPercentage = mutationBufferLowWatermarkPercentage;
- this.mutationBufferLowWatermark =
- (int)(this.mutationBufferLowWatermarkPercentage * mutationBufferSpace);
- }
-
- /**
- * Lets us set a specific seed for tests
- * @param seed
- */
- @VisibleForTesting
- void setRandomSeed(long seed) {
- this.randomizer.setSeed(seed);
- }
-
- @Override
- public void setFlushInterval(int interval) {
- this.interval = interval;
- }
-
- @Override
- public void setTimeoutMillis(long timeout) {
- this.timeoutMs = timeout;
- }
-
- @Override
- public long getTimeoutMillis() {
- return this.timeoutMs;
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
-
- @Override
- public boolean isIgnoreAllDuplicateRows() {
- return ignoreAllDuplicateRows;
- }
-
- @Override
- public void setIgnoreAllDuplicateRows(boolean ignoreAllDuplicateRows) {
- this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
- }
-
- @Override
- public int countPendingErrors() {
- return errorCollector.countErrors();
- }
-
- @Override
- public RowErrorsAndOverflowStatus getPendingErrors() {
- return errorCollector.getErrors();
- }
-
- /**
- * Flushes the buffered operations and marks this session as closed.
- * See the javadoc on {@link #flush()} on how to deal with exceptions coming out of this method.
- * @return a Deferred whose callback chain will be invoked when.
- * everything that was buffered at the time of the call has been flushed.
- */
- public Deferred<List<OperationResponse>> close() {
- if (!closed) {
- closed = true;
- client.removeSession(this);
- }
- return flush();
- }
-
- /**
- * Returns a buffer to the inactive queue after flushing.
- * @param buffer the buffer to return to the inactive queue.
- */
- private void queueBuffer(Buffer buffer) {
- buffer.callbackFlushNotification();
- Deferred<Void> localFlushNotification = flushNotification.getAndSet(new Deferred<Void>());
- inactiveBuffers.add(buffer);
- localFlushNotification.callback(null);
- }
-
- /**
- * Callback which waits for all tablet location lookups to complete, groups all operations into
- * batches by tablet, and dispatches them. When all of the batches are complete, a deferred is
- * fired and the buffer is added to the inactive queue.
- */
- private final class TabletLookupCB implements Callback<Void, Object> {
- private final AtomicInteger lookupsOutstanding;
- private final Buffer buffer;
- private final Deferred<List<BatchResponse>> deferred;
-
- public TabletLookupCB(Buffer buffer, Deferred<List<BatchResponse>> deferred) {
- this.lookupsOutstanding = new AtomicInteger(buffer.getOperations().size());
- this.buffer = buffer;
- this.deferred = deferred;
- }
-
- @Override
- public Void call(Object _void) throws Exception {
- if (lookupsOutstanding.decrementAndGet() != 0) return null;
-
- // The final tablet lookup is complete. Batch all of the buffered
- // operations into their respective tablet, and then send the batches.
-
- // Group the operations by tablet.
- Map<Slice, Batch> batches = new HashMap<>();
- List<OperationResponse> opsFailedInLookup = new ArrayList<>();
-
- for (BufferedOperation bufferedOp : buffer.getOperations()) {
- Operation operation = bufferedOp.getOperation();
- if (bufferedOp.tabletLookupFailed()) {
- Exception failure = bufferedOp.getTabletLookupFailure();
- RowError error;
- if (failure instanceof NonCoveredRangeException) {
- // TODO: this should be something different than NotFound so that
- // applications can distinguish from updates on missing rows.
- error = new RowError(Status.NotFound(failure.getMessage()), operation);
- } else {
- LOG.warn("unexpected tablet lookup failure for operation {}", operation, failure);
- error = new RowError(Status.RuntimeError(failure.getMessage()), operation);
- }
- OperationResponse response = new OperationResponse(0, null, 0, operation, error);
- // Add the row error to the error collector if the session is in background flush mode,
- // and complete the operation's deferred with the error response. The ordering between
- // adding to the error collector and completing the deferred should not matter since
- // applications should be using one or the other method for error handling, not both.
- if (flushMode == FlushMode.AUTO_FLUSH_BACKGROUND) {
- errorCollector.addError(error);
- }
- operation.callback(response);
- opsFailedInLookup.add(response);
- continue;
- }
- LocatedTablet tablet = bufferedOp.getTablet();
- Slice tabletId = new Slice(tablet.getTabletId());
-
- Batch batch = batches.get(tabletId);
- if (batch == null) {
- batch = new Batch(operation.getTable(), tablet, ignoreAllDuplicateRows);
- batches.put(tabletId, batch);
- }
- batch.add(operation);
- }
-
- List<Deferred<BatchResponse>> batchResponses = new ArrayList<>(batches.size() + 1);
- if (!opsFailedInLookup.isEmpty()) {
- batchResponses.add(Deferred.fromResult(new BatchResponse(opsFailedInLookup)));
- }
-
- for (Batch batch : batches.values()) {
- if (timeoutMs != 0) {
- batch.deadlineTracker.reset();
- batch.setTimeoutMillis(timeoutMs);
- }
- addBatchCallbacks(batch);
- batchResponses.add(client.sendRpcToTablet(batch));
- }
-
- // On completion of all batches, fire the completion deferred, and add the buffer
- // back to the inactive buffers queue. This frees it up for new inserts.
- AsyncUtil.addBoth(
- Deferred.group(batchResponses),
- new Callback<Void, Object>() {
- @Override
- public Void call(Object responses) {
- queueBuffer(buffer);
- deferred.callback(responses);
- return null;
- }
- });
-
- return null;
- }
- }
-
- /**
- * Flush buffered writes.
- * @return a {@link Deferred} whose callback chain will be invoked when all applied operations at
- * the time of the call have been flushed.
- */
- public Deferred<List<OperationResponse>> flush() {
- Buffer buffer;
- Deferred<Void> nonActiveBufferFlush;
- synchronized (monitor) {
- nonActiveBufferFlush = getNonActiveFlushNotification();
- buffer = activeBuffer;
- activeBuffer = null;
- }
-
- final Deferred<List<OperationResponse>> activeBufferFlush = buffer == null ?
- Deferred.<List<OperationResponse>>fromResult(ImmutableList.<OperationResponse>of()) :
- doFlush(buffer);
-
- return AsyncUtil.addBothDeferring(nonActiveBufferFlush,
- new Callback<Deferred<List<OperationResponse>>, Object>() {
- @Override
- public Deferred<List<OperationResponse>> call(Object arg) {
- return activeBufferFlush;
- }
- });
- }
-
- /**
- * Flushes a write buffer. This method takes ownership of the buffer, no other concurrent access
- * is allowed.
- *
- * @param buffer the buffer to flush, must not be modified once passed to this method
- * @return the operation responses
- */
- private Deferred<List<OperationResponse>> doFlush(Buffer buffer) {
- LOG.debug("flushing buffer: {}", buffer);
- if (buffer.getOperations().isEmpty()) {
- // no-op.
- return Deferred.<List<OperationResponse>>fromResult(ImmutableList.<OperationResponse>of());
- }
-
- Deferred<List<BatchResponse>> batchResponses = new Deferred<>();
- Callback<Void, Object> tabletLookupCB = new TabletLookupCB(buffer, batchResponses);
-
- for (BufferedOperation bufferedOperation : buffer.getOperations()) {
- AsyncUtil.addBoth(bufferedOperation.getTabletLookup(), tabletLookupCB);
- }
-
- return batchResponses.addCallback(ConvertBatchToListOfResponsesCB.getInstance());
- }
-
- /**
- * Callback used to send a list of OperationResponse instead of BatchResponse since the
- * latter is an implementation detail.
- */
- private static class ConvertBatchToListOfResponsesCB implements Callback<List<OperationResponse>,
- List<BatchResponse>> {
- private static final ConvertBatchToListOfResponsesCB INSTANCE =
- new ConvertBatchToListOfResponsesCB();
- @Override
- public List<OperationResponse> call(List<BatchResponse> batchResponses) throws Exception {
- // First compute the size of the union of all the lists so that we don't trigger expensive
- // list growths while adding responses to it.
- int size = 0;
- for (BatchResponse batchResponse : batchResponses) {
- size += batchResponse.getIndividualResponses().size();
- }
-
- ArrayList<OperationResponse> responses = new ArrayList<>(size);
- for (BatchResponse batchResponse : batchResponses) {
- responses.addAll(batchResponse.getIndividualResponses());
- }
-
- return responses;
- }
- @Override
- public String toString() {
- return "ConvertBatchToListOfResponsesCB";
- }
- public static ConvertBatchToListOfResponsesCB getInstance() {
- return INSTANCE;
- }
- }
-
- @Override
- public boolean hasPendingOperations() {
- synchronized (monitor) {
- return activeBuffer == null ? inactiveBuffers.size() < 2 :
- activeBuffer.getOperations().size() > 0 || !inactiveBufferAvailable();
- }
- }
-
- /**
- * Apply the given operation.
- * The behavior of this function depends on the current flush mode. Regardless
- * of flush mode, however, Apply may begin to perform processing in the background
- * for the call (e.g looking up the tablet, etc).
- * @param operation operation to apply
- * @return a Deferred to track this operation
- * @throws KuduException if an error happens or {@link PleaseThrottleException} is triggered
- */
- public Deferred<OperationResponse> apply(final Operation operation) throws KuduException {
- Preconditions.checkNotNull(operation, "Can not apply a null operation");
-
- // Freeze the row so that the client can not concurrently modify it while it is in flight.
- operation.getRow().freeze();
-
- // If immediate flush mode, send the operation directly.
- if (flushMode == FlushMode.AUTO_FLUSH_SYNC) {
- if (timeoutMs != 0) {
- operation.setTimeoutMillis(timeoutMs);
- }
- operation.setExternalConsistencyMode(this.consistencyMode);
- operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
- return client.sendRpcToTablet(operation);
- }
-
- // Kick off a location lookup.
- Deferred<LocatedTablet> tablet = client.getTabletLocation(operation.getTable(),
- operation.partitionKey(),
- timeoutMs);
-
- // Holds a buffer that should be flushed outside the synchronized block, if necessary.
- Buffer fullBuffer = null;
- try {
- synchronized (monitor) {
- if (activeBuffer == null) {
- // If the active buffer is null then we recently flushed. Check if there
- // is an inactive buffer available to replace as the active.
- if (inactiveBufferAvailable()) {
- refreshActiveBuffer();
- } else {
- Status statusServiceUnavailable =
- Status.ServiceUnavailable("All buffers are currently flushing");
- // This can happen if the user writes into a buffer, flushes it, writes
- // into the second, flushes it, and immediately tries to write again.
- throw new PleaseThrottleException(statusServiceUnavailable,
- null, operation, flushNotification.get());
- }
- }
-
- if (flushMode == FlushMode.MANUAL_FLUSH) {
- if (activeBuffer.getOperations().size() < mutationBufferSpace) {
- activeBuffer.getOperations().add(new BufferedOperation(tablet, operation));
- } else {
- Status statusIllegalState =
- Status.IllegalState("MANUAL_FLUSH is enabled but the buffer is too big");
- throw new NonRecoverableException(statusIllegalState);
- }
- } else {
- assert flushMode == FlushMode.AUTO_FLUSH_BACKGROUND;
- int activeBufferSize = activeBuffer.getOperations().size();
-
- if (activeBufferSize >= mutationBufferSpace) {
- // Save the active buffer into fullBuffer so that it gets flushed when we leave this
- // synchronized block.
- fullBuffer = activeBuffer;
- activeBuffer = null;
- activeBufferSize = 0;
- if (inactiveBufferAvailable()) {
- refreshActiveBuffer();
- } else {
- Status statusServiceUnavailable =
- Status.ServiceUnavailable("All buffers are currently flushing");
- throw new PleaseThrottleException(statusServiceUnavailable,
- null, operation, flushNotification.get());
- }
- }
-
- if (mutationBufferLowWatermark < mutationBufferSpace && // low watermark is enabled
- activeBufferSize >= mutationBufferLowWatermark && // buffer is over low water mark
- !inactiveBufferAvailable()) { // no inactive buffers
-
- // Check if we are over the low water mark.
- int randomWatermark = activeBufferSize + 1 +
- randomizer.nextInt(mutationBufferSpace -
- mutationBufferLowWatermark);
-
- if (randomWatermark > mutationBufferSpace) {
- Status statusServiceUnavailable =
- Status.ServiceUnavailable("The previous buffer hasn't been flushed and the " +
- "current buffer is over the low watermark, please retry later");
- throw new PleaseThrottleException(statusServiceUnavailable,
- null, operation, flushNotification.get());
- }
- }
-
- activeBuffer.getOperations().add(new BufferedOperation(tablet, operation));
-
- if (activeBufferSize + 1 >= mutationBufferSpace && inactiveBufferAvailable()) {
- // If the operation filled the buffer, then flush it.
- Preconditions.checkState(fullBuffer == null);
- fullBuffer = activeBuffer;
- activeBuffer = null;
- activeBufferSize = 0;
- } else if (activeBufferSize == 0) {
- // If this is the first operation in the buffer, start a background flush timer.
- client.newTimeout(activeBuffer.getFlusherTask(), interval);
- }
- }
- }
- } finally {
- // Flush the buffer outside of the synchronized block, if required.
- if (fullBuffer != null) {
- doFlush(fullBuffer);
- }
- }
- return operation.getDeferred();
- }
-
- /**
- * Returns {@code true} if there is an inactive buffer available.
- * @return true if there is currently an inactive buffer available
- */
- private boolean inactiveBufferAvailable() {
- return inactiveBuffers.peek() != null;
- }
-
- /**
- * Refreshes the active buffer. This should only be called after a
- * {@link #flush()} when the active buffer is {@code null}, there is an
- * inactive buffer available (see {@link #inactiveBufferAvailable()}, and
- * {@link #monitor} is locked.
- */
- @GuardedBy("monitor")
- private void refreshActiveBuffer() {
- Preconditions.checkState(activeBuffer == null);
- activeBuffer = inactiveBuffers.remove();
- activeBuffer.reset();
- }
-
- /**
- * Returns a flush notification for the currently non-active buffers.
- * This is used during manual {@link #flush} calls to ensure that all buffers (not just the active
- * buffer) are fully flushed before completing.
- */
- @GuardedBy("monitor")
- private Deferred<Void> getNonActiveFlushNotification() {
- final Deferred<Void> notificationA = bufferA.getFlushNotification();
- final Deferred<Void> notificationB = bufferB.getFlushNotification();
- if (activeBuffer == null) {
- // Both buffers are either flushing or inactive.
- return AsyncUtil.addBothDeferring(notificationA, new Callback<Deferred<Void>, Object>() {
- @Override
- public Deferred<Void> call(Object _obj) throws Exception {
- return notificationB;
- }
- });
- } else if (activeBuffer == bufferA) {
- return notificationB;
- } else {
- return notificationA;
- }
- }
-
- /**
- * Creates callbacks to handle a multi-put and adds them to the request.
- * @param request the request for which we must handle the response
- */
- private void addBatchCallbacks(final Batch request) {
- final class BatchCallback implements Callback<BatchResponse, BatchResponse> {
- public BatchResponse call(final BatchResponse response) {
- LOG.trace("Got a Batch response for {} rows", request.operations.size());
- if (response.getWriteTimestamp() != 0) {
- AsyncKuduSession.this.client.updateLastPropagatedTimestamp(response.getWriteTimestamp());
- }
-
- // Send individualized responses to all the operations in this batch.
- for (OperationResponse operationResponse : response.getIndividualResponses()) {
- operationResponse.getOperation().callback(operationResponse);
- if (flushMode == FlushMode.AUTO_FLUSH_BACKGROUND && operationResponse.hasRowError()) {
- errorCollector.addError(operationResponse.getRowError());
- }
- }
-
- return response;
- }
-
- @Override
- public String toString() {
- return "apply batch response";
- }
- }
-
- final class BatchErrCallback implements Callback<Exception, Exception> {
- @Override
- public Exception call(Exception e) {
- // Send the same exception to all the operations.
- for (Operation operation : request.operations) {
- operation.errback(e);
- }
- return e;
- }
- @Override
- public String toString() {
- return "apply batch error response";
- }
- }
-
- request.getDeferred().addCallbacks(new BatchCallback(), new BatchErrCallback());
- }
-
- /**
- * A FlusherTask is created for each active buffer in mode
- * {@link FlushMode#AUTO_FLUSH_BACKGROUND}.
- */
- private final class FlusherTask implements TimerTask {
- public void run(final Timeout timeout) {
- Buffer buffer = null;
- synchronized (monitor) {
- if (activeBuffer == null) {
- return;
- }
- if (activeBuffer.getFlusherTask() == this) {
- buffer = activeBuffer;
- activeBuffer = null;
- }
- }
-
- if (buffer != null) {
- doFlush(buffer);
- }
- }
- }
-
- /**
- * The {@code Buffer} consists of a list of operations, an optional pointer to a flush task,
- * and a flush notification.
- *
- * The {@link #flusherTask} is used in mode {@link FlushMode#AUTO_FLUSH_BACKGROUND} to point to
- * the background flusher task assigned to the buffer when it becomes active and the first
- * operation is applied to it. When the flusher task executes after the timeout, it checks
- * that the currently active buffer's flusher task points to itself before executing the flush.
- * This protects against the background task waking up after one or more manual flushes and
- * attempting to flush the active buffer.
- *
- * The {@link #flushNotification} deferred is used when executing manual {@link #flush}es to
- * ensure that non-active buffers are fully flushed. {@code flushNotification} is completed
- * when this buffer is successfully flushed. When the buffer is promoted from inactive to active,
- * the deferred is replaced with a new one to indicate that the buffer is not yet flushed.
- *
- * Buffer is externally synchronized. When the active buffer, {@link #monitor}
- * synchronizes access to it.
- */
- private final class Buffer {
- private final List<BufferedOperation> operations = new ArrayList<>();
-
- private FlusherTask flusherTask = null;
-
- private Deferred<Void> flushNotification = Deferred.fromResult(null);
-
- public List<BufferedOperation> getOperations() {
- return operations;
- }
-
- @GuardedBy("monitor")
- public FlusherTask getFlusherTask() {
- if (flusherTask == null) {
- flusherTask = new FlusherTask();
- }
- return flusherTask;
- }
-
- /**
- * Returns a {@link Deferred} which will be completed when this buffer is flushed. If the buffer
- * is inactive (its flush is complete and it has been enqueued into {@link #inactiveBuffers}),
- * then the deferred will already be complete.
- */
- public Deferred<Void> getFlushNotification() {
- return flushNotification;
- }
-
- /**
- * Completes the buffer's flush notification. Should be called when the buffer has been
- * successfully flushed.
- */
- public void callbackFlushNotification() {
- LOG.trace("buffer flush notification fired: {}", this);
- flushNotification.callback(null);
- }
-
- /**
- * Resets the buffer's internal state. Should be called when the buffer is promoted from
- * inactive to active.
- */
- @GuardedBy("monitor")
- public void reset() {
- LOG.trace("buffer reset: {}", this);
- operations.clear();
- flushNotification = new Deferred<>();
- flusherTask = null;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("operations", operations.size())
- .add("flusherTask", flusherTask)
- .add("flushNotification", flushNotification)
- .toString();
- }
- }
-
- /**
- * Container class holding all the state associated with a buffered operation.
- */
- private static final class BufferedOperation {
- /** Holds either a {@link LocatedTablet} or the failure exception if the lookup failed. */
- private Object tablet = null;
- private final Deferred<Void> tabletLookup;
- private final Operation operation;
-
- public BufferedOperation(Deferred<LocatedTablet> tablet,
- Operation operation) {
- tabletLookup = AsyncUtil.addBoth(tablet, new Callback<Void, Object>() {
- @Override
- public Void call(final Object tablet) {
- BufferedOperation.this.tablet = tablet;
- return null;
- }
- });
- this.operation = Preconditions.checkNotNull(operation);
- }
-
- /**
- * @return {@code true} if the tablet lookup failed.
- */
- public boolean tabletLookupFailed() {
- return !(tablet instanceof LocatedTablet);
- }
-
- /**
- * @return the located tablet
- * @throws ClassCastException if the tablet lookup failed,
- * check with {@link #tabletLookupFailed} before calling
- */
- public LocatedTablet getTablet() {
- return (LocatedTablet) tablet;
- }
-
- /**
- * @return the cause of the failed lookup
- * @throws ClassCastException if the tablet lookup succeeded,
- * check with {@link #tabletLookupFailed} before calling
- */
- public Exception getTabletLookupFailure() {
- return (Exception) tablet;
- }
-
- public Deferred<Void> getTabletLookup() {
- return tabletLookup;
- }
-
- public Operation getOperation() {
- return operation;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("tablet", tablet)
- .add("operation", operation)
- .toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/kududb/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Batch.java b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
deleted file mode 100644
index ed1d870..0000000
--- a/java/kudu-client/src/main/java/org/kududb/client/Batch.java
+++ /dev/null
@@ -1,199 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.protobuf.Message;
-import com.google.protobuf.ZeroCopyLiteralByteString;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.kududb.WireProtocol;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.client.Statistics.Statistic;
-import org.kududb.client.Statistics.TabletStatistics;
-import org.kududb.tserver.Tserver;
-import org.kududb.tserver.Tserver.TabletServerErrorPB;
-import org.kududb.util.Pair;
-import org.kududb.util.Slice;
-
-/**
- * Used internally to group Operations for a single tablet together before sending to the tablet
- * server.
- */
-@InterfaceAudience.Private
-class Batch extends KuduRpc<BatchResponse> {
-
- /** Holds batched operations. */
- final List<Operation> operations = new ArrayList<>();
-
- /** The tablet this batch will be routed to. */
- private final LocatedTablet tablet;
-
- /**
- * This size will be set when serialize is called. It stands for the size of rows in all
- * operations in this batch.
- */
- private long rowOperationsSizeBytes = 0;
-
- /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
- private final boolean ignoreAllDuplicateRows;
-
-
- Batch(KuduTable table, LocatedTablet tablet, boolean ignoreAllDuplicateRows) {
- super(table);
- this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
- this.tablet = tablet;
- }
-
- /**
- * Returns the bytes size of this batch's row operations after serialization.
- * @return size in bytes
- * @throws IllegalStateException thrown if this RPC hasn't been serialized eg sent to a TS
- */
- long getRowOperationsSizeBytes() {
- if (this.rowOperationsSizeBytes == 0) {
- throw new IllegalStateException("This row hasn't been serialized yet");
- }
- return this.rowOperationsSizeBytes;
- }
-
- public void add(Operation operation) {
- assert Bytes.memcmp(operation.partitionKey(),
- tablet.getPartition().getPartitionKeyStart()) >= 0 &&
- (tablet.getPartition().getPartitionKeyEnd().length == 0 ||
- Bytes.memcmp(operation.partitionKey(),
- tablet.getPartition().getPartitionKeyEnd()) < 0);
-
- operations.add(operation);
- }
-
- @Override
- ChannelBuffer serialize(Message header) {
- final Tserver.WriteRequestPB.Builder builder = Operation.createAndFillWriteRequestPB(operations);
- rowOperationsSizeBytes = builder.getRowOperations().getRows().size() +
- builder.getRowOperations().getIndirectData().size();
- builder.setTabletId(ZeroCopyLiteralByteString.wrap(getTablet().getTabletIdAsBytes()));
- builder.setExternalConsistencyMode(externalConsistencyMode.pbVersion());
- return toChannelBuffer(header, builder.build());
- }
-
- @Override
- String serviceName() {
- return TABLET_SERVER_SERVICE_NAME;
- }
-
- @Override
- String method() {
- return Operation.METHOD;
- }
-
- @Override
- Pair<BatchResponse, Object> deserialize(CallResponse callResponse,
- String tsUUID) throws Exception {
- Tserver.WriteResponsePB.Builder builder = Tserver.WriteResponsePB.newBuilder();
- readProtobuf(callResponse.getPBMessage(), builder);
-
- List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB = builder.getPerRowErrorsList();
- if (ignoreAllDuplicateRows) {
- boolean allAlreadyPresent = true;
- for (Tserver.WriteResponsePB.PerRowErrorPB errorPB : errorsPB) {
- if (errorPB.getError().getCode() != WireProtocol.AppStatusPB.ErrorCode.ALREADY_PRESENT) {
- allAlreadyPresent = false;
- break;
- }
- }
- if (allAlreadyPresent) {
- errorsPB = Collections.emptyList();
- }
- }
-
- BatchResponse response = new BatchResponse(deadlineTracker.getElapsedMillis(), tsUUID,
- builder.getTimestamp(), errorsPB, operations);
-
- if (injectedError != null) {
- if (injectedlatencyMs > 0) {
- try {
- Thread.sleep(injectedlatencyMs);
- } catch (InterruptedException e) {
- }
- }
- return new Pair<BatchResponse, Object>(response, injectedError);
- }
-
- return new Pair<BatchResponse, Object>(response, builder.hasError() ? builder.getError() : null);
- }
-
- @Override
- public byte[] partitionKey() {
- return tablet.getPartition().getPartitionKeyStart();
- }
-
- @Override
- boolean isRequestTracked() {
- return true;
- }
-
- @Override
- void updateStatistics(Statistics statistics, BatchResponse response) {
- Slice tabletId = this.getTablet().getTabletId();
- String tableName = this.getTable().getName();
- TabletStatistics tabletStatistics = statistics.getTabletStatistics(tableName, tabletId);
- if (response == null) {
- tabletStatistics.incrementStatistic(Statistic.OPS_ERRORS, operations.size());
- tabletStatistics.incrementStatistic(Statistic.RPC_ERRORS, 1);
- return;
- }
- tabletStatistics.incrementStatistic(Statistic.WRITE_RPCS, 1);
- for (OperationResponse opResponse : response.getIndividualResponses()) {
- if (opResponse.hasRowError()) {
- tabletStatistics.incrementStatistic(Statistic.OPS_ERRORS, 1);
- } else {
- tabletStatistics.incrementStatistic(Statistic.WRITE_OPS, 1);
- }
- }
- tabletStatistics.incrementStatistic(Statistic.BYTES_WRITTEN, getRowOperationsSizeBytes());
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("operations", operations.size())
- .add("tablet", tablet)
- .add("ignoreAllDuplicateRows", ignoreAllDuplicateRows)
- .toString();
- }
-
- private static TabletServerErrorPB injectedError;
- private static int injectedlatencyMs;
-
- /**
- * Inject tablet server side error for Batch rpc related tests.
- * @param error error response from tablet server
- * @param latencyMs blocks response handling thread for some time to simulate
- * write latency
- */
- @VisibleForTesting
- static void injectTabletServerErrorAndLatency(TabletServerErrorPB error, int latencyMs) {
- injectedError = error;
- injectedlatencyMs = latencyMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java b/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java
deleted file mode 100644
index f67153b..0000000
--- a/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java
+++ /dev/null
@@ -1,105 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.tserver.Tserver;
-
-/**
- * Response type for Batch (which is used internally by AsyncKuduSession).
- * Provides the Hybrid Time write timestamp returned by the Tablet Server.
- */
-@InterfaceAudience.Private
-public class BatchResponse extends KuduRpcResponse {
-
- private final long writeTimestamp;
- private final List<RowError> rowErrors;
- private final List<OperationResponse> individualResponses;
-
- /**
- * Package-private constructor to be used by the RPCs.
- * @param elapsedMillis time in milliseconds since RPC creation to now
- * @param writeTimestamp HT's write timestamp
- * @param errorsPB a list of row errors, can be empty
- * @param operations the list of operations which created this response
- */
- BatchResponse(long elapsedMillis, String tsUUID, long writeTimestamp,
- List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB,
- List<Operation> operations) {
- super(elapsedMillis, tsUUID);
- this.writeTimestamp = writeTimestamp;
- individualResponses = new ArrayList<>(operations.size());
- if (errorsPB.isEmpty()) {
- rowErrors = Collections.emptyList();
- } else {
- rowErrors = new ArrayList<>(errorsPB.size());
- }
-
- // Populate the list of individual row responses and the list of row errors. Not all the rows
- // maybe have errors, but 'errorsPB' contains them in the same order as the operations that
- // were sent.
- int currentErrorIndex = 0;
- Operation currentOperation;
- for (int i = 0; i < operations.size(); i++) {
- RowError rowError = null;
- currentOperation = operations.get(i);
- if (currentErrorIndex < errorsPB.size() &&
- errorsPB.get(currentErrorIndex).getRowIndex() == i) {
- rowError = RowError.fromRowErrorPb(errorsPB.get(currentErrorIndex),
- currentOperation, tsUUID);
- rowErrors.add(rowError);
- currentErrorIndex++;
- }
- individualResponses.add(
- new OperationResponse(currentOperation.deadlineTracker.getElapsedMillis(), tsUUID,
- writeTimestamp, currentOperation, rowError));
- }
- assert (rowErrors.size() == errorsPB.size());
- assert (individualResponses.size() == operations.size());
- }
-
- BatchResponse(List<OperationResponse> individualResponses) {
- super(0, null);
- writeTimestamp = 0;
- rowErrors = ImmutableList.of();
- this.individualResponses = individualResponses;
- }
-
- /**
- * Gives the write timestamp that was returned by the Tablet Server.
- * @return a timestamp in milliseconds, 0 if the external consistency mode set in AsyncKuduSession
- * wasn't CLIENT_PROPAGATED
- */
- public long getWriteTimestamp() {
- return writeTimestamp;
- }
-
- /**
- * Package-private method to get the individual responses.
- * @return a list of OperationResponses
- */
- List<OperationResponse> getIndividualResponses() {
- return individualResponses;
- }
-
-}