You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/25 17:15:24 UTC

[19/36] incubator-kudu git commit: [java-client] repackage to org.apache.kudu (Part 1)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
deleted file mode 100644
index 7699536..0000000
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
+++ /dev/null
@@ -1,894 +0,0 @@
-/*
- * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *   - Redistributions of source code must retain the above copyright notice,
- *     this list of conditions and the following disclaimer.
- *   - Redistributions in binary form must reproduce the above copyright notice,
- *     this list of conditions and the following disclaimer in the documentation
- *     and/or other materials provided with the distribution.
- *   - Neither the name of the StumbleUpon nor the names of its contributors
- *     may be used to endorse or promote products derived from this software
- *     without specific prior written permission.
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-package org.kududb.client;
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.Message;
-import com.google.protobuf.ZeroCopyLiteralByteString;
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.kududb.ColumnSchema;
-import org.kududb.Common;
-import org.kududb.Schema;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.tserver.Tserver;
-import org.kududb.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.kududb.tserver.Tserver.NewScanRequestPB;
-import static org.kududb.tserver.Tserver.ScanRequestPB;
-import static org.kududb.tserver.Tserver.ScanResponsePB;
-import static org.kududb.tserver.Tserver.TabletServerErrorPB;
-
-/**
- * Creates a scanner to read data from Kudu.
- * <p>
- * This class is <strong>not synchronized</strong> as it's expected to be
- * used from a single thread at a time. It's rarely (if ever?) useful to
- * scan concurrently from a shared scanner using multiple threads. If you
- * want to optimize large table scans using extra parallelism, create a few
- * scanners and give each of them a partition of the table to scan. Or use
- * MapReduce.
- * <p>
- * There's no method in this class to explicitly open the scanner. It will open
- * itself automatically when you start scanning by calling {@link #nextRows()}.
- * Also, the scanner will automatically call {@link #close} when it reaches the
- * end key. If, however, you would like to stop scanning <i>before reaching the
- * end key</i>, you <b>must</b> call {@link #close} before disposing of the scanner.
- * Note that it's always safe to call {@link #close} on a scanner.
- * <p>
- * A {@code AsyncKuduScanner} is not re-usable. Should you want to scan the same rows
- * or the same table again, you must create a new one.
- *
- * <h1>A note on passing {@code byte} arrays in argument</h1>
- * None of the method that receive a {@code byte[]} in argument will copy it.
- * For more info, please refer to the documentation of {@link KuduRpc}.
- * <h1>A note on passing {@code String}s in argument</h1>
- * All strings are assumed to use the platform's default charset.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public final class AsyncKuduScanner {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AsyncKuduScanner.class);
-
-  /**
-   * The possible read modes for scanners.
-   */
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public enum ReadMode {
-    /**
-     * When READ_LATEST is specified the server will always return committed writes at
-     * the time the request was received. This type of read does not return a snapshot
-     * timestamp and is not repeatable.
-     *
-     * In ACID terms this corresponds to Isolation mode: "Read Committed"
-     *
-     * This is the default mode.
-     */
-    READ_LATEST(Common.ReadMode.READ_LATEST),
-
-    /**
-     * When READ_AT_SNAPSHOT is specified the server will attempt to perform a read
-     * at the provided timestamp. If no timestamp is provided the server will take the
-     * current time as the snapshot timestamp. In this mode reads are repeatable, i.e.
-     * all future reads at the same timestamp will yield the same data. This is
-     * performed at the expense of waiting for in-flight transactions whose timestamp
-     * is lower than the snapshot's timestamp to complete, so it might incur a latency
-     * penalty.
-     *
-     * In ACID terms this, by itself, corresponds to Isolation mode "Repeatable
-     * Read". If all writes to the scanned tablet are made externally consistent,
-     * then this corresponds to Isolation mode "Strict-Serializable".
-     *
-     * Note: there currently "holes", which happen in rare edge conditions, by which writes
-     * are sometimes not externally consistent even when action was taken to make them so.
-     * In these cases Isolation may degenerate to mode "Read Committed". See KUDU-430.
-     */
-    READ_AT_SNAPSHOT(Common.ReadMode.READ_AT_SNAPSHOT);
-
-    private Common.ReadMode pbVersion;
-    ReadMode(Common.ReadMode pbVersion) {
-      this.pbVersion = pbVersion;
-    }
-
-    @InterfaceAudience.Private
-    public Common.ReadMode pbVersion() {
-      return this.pbVersion;
-    }
-  }
-
-  //////////////////////////
-  // Initial configurations.
-  //////////////////////////
-
-  private final AsyncKuduClient client;
-  private final KuduTable table;
-  private final Schema schema;
-
-  /**
-   * Map of column name to predicate.
-   */
-  private final Map<String, KuduPredicate> predicates;
-
-  /**
-   * Maximum number of bytes returned by the scanner, on each batch.
-   */
-  private final int batchSizeBytes;
-
-  /**
-   * The maximum number of rows to scan.
-   */
-  private final long limit;
-
-  /**
-   * The start partition key of the next tablet to scan.
-   *
-   * Each time the scan exhausts a tablet, this is updated to that tablet's end partition key.
-   */
-  private byte[] nextPartitionKey;
-
-  /**
-   * The end partition key of the last tablet to scan.
-   */
-  private final byte[] endPartitionKey;
-
-  /**
-   * Set in the builder. If it's not set by the user, it will default to EMPTY_ARRAY.
-   * It is then reset to the new start primary key of each tablet we open a scanner on as the scan
-   * moves from one tablet to the next.
-   */
-  private final byte[] startPrimaryKey;
-
-  /**
-   * Set in the builder. If it's not set by the user, it will default to EMPTY_ARRAY.
-   * It's never modified after that.
-   */
-  private final byte[] endPrimaryKey;
-
-  private final boolean prefetching;
-
-  private final boolean cacheBlocks;
-
-  private final ReadMode readMode;
-
-  private final Common.OrderMode orderMode;
-
-  private final long htTimestamp;
-
-  /////////////////////
-  // Runtime variables.
-  /////////////////////
-
-  private boolean closed = false;
-
-  private boolean hasMore = true;
-
-  /**
-   * The tabletSlice currently being scanned.
-   * If null, we haven't started scanning.
-   * If == DONE, then we're done scanning.
-   * Otherwise it contains a proper tabletSlice name, and we're currently scanning.
-   */
-  private AsyncKuduClient.RemoteTablet tablet;
-
-  /**
-   * This is the scanner ID we got from the TabletServer.
-   * It's generated randomly so any value is possible.
-   */
-  private byte[] scannerId;
-
-  /**
-   * The sequence ID of this call. The sequence ID should start at 0
-   * with the request for a new scanner, and after each successful request,
-   * the client should increment it by 1. When retrying a request, the client
-   * should _not_ increment this value. If the server detects that the client
-   * missed a chunk of rows from the middle of a scan, it will respond with an
-   * error.
-   */
-  private int sequenceId;
-
-  private Deferred<RowResultIterator> prefetcherDeferred;
-
-  private boolean inFirstTablet = true;
-
-  final long scanRequestTimeout;
-
-  private static final AtomicBoolean PARTITION_PRUNE_WARN = new AtomicBoolean(true);
-
-  AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> projectedNames,
-                   List<Integer> projectedIndexes, ReadMode readMode, Common.OrderMode orderMode,
-                   long scanRequestTimeout,
-                   Map<String, KuduPredicate> predicates, long limit,
-                   boolean cacheBlocks, boolean prefetching,
-                   byte[] startPrimaryKey, byte[] endPrimaryKey,
-                   byte[] startPartitionKey, byte[] endPartitionKey,
-                   long htTimestamp, int batchSizeBytes) {
-    checkArgument(batchSizeBytes > 0, "Need a strictly positive number of bytes, " +
-        "got %s", batchSizeBytes);
-    checkArgument(limit > 0, "Need a strictly positive number for the limit, " +
-        "got %s", limit);
-    if (htTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
-      checkArgument(htTimestamp >= 0, "Need non-negative number for the scan, " +
-          " timestamp got %s", htTimestamp);
-      checkArgument(readMode == ReadMode.READ_AT_SNAPSHOT, "When specifying a " +
-          "HybridClock timestamp, the read mode needs to be set to READ_AT_SNAPSHOT");
-    }
-    if (orderMode == Common.OrderMode.ORDERED) {
-      checkArgument(readMode == ReadMode.READ_AT_SNAPSHOT, "Returning rows in primary key order " +
-          "requires the read mode to be set to READ_AT_SNAPSHOT");
-    }
-
-    this.client = client;
-    this.table = table;
-    this.readMode = readMode;
-    this.orderMode = orderMode;
-    this.scanRequestTimeout = scanRequestTimeout;
-    this.predicates = predicates;
-    this.limit = limit;
-    this.cacheBlocks = cacheBlocks;
-    this.prefetching = prefetching;
-    this.startPrimaryKey = startPrimaryKey;
-    this.endPrimaryKey = endPrimaryKey;
-    this.htTimestamp = htTimestamp;
-    this.batchSizeBytes = batchSizeBytes;
-
-    if (!table.getPartitionSchema().isSimpleRangePartitioning() &&
-        (startPrimaryKey != AsyncKuduClient.EMPTY_ARRAY ||
-         endPrimaryKey != AsyncKuduClient.EMPTY_ARRAY) &&
-        PARTITION_PRUNE_WARN.getAndSet(false)) {
-      LOG.warn("Starting full table scan. " +
-               "In the future this scan may be automatically optimized with partition pruning.");
-    }
-
-    if (table.getPartitionSchema().isSimpleRangePartitioning()) {
-      // If the table is simple range partitioned, then the partition key space
-      // is isomorphic to the primary key space. We can potentially reduce the
-      // scan length by only scanning the intersection of the primary key range
-      // and the partition key range. This is a stop-gap until real partition
-      // pruning is in place that can work across any partitioning type.
-
-      if ((endPartitionKey.length != 0 && Bytes.memcmp(startPrimaryKey, endPartitionKey) >= 0) ||
-          (endPrimaryKey.length != 0 && Bytes.memcmp(startPartitionKey, endPrimaryKey) >= 0)) {
-        // The primary key range and the partition key range do not intersect;
-        // the scan will be empty.
-        this.nextPartitionKey = startPartitionKey;
-        this.endPartitionKey = endPartitionKey;
-      } else {
-        // Assign the scan's partition key range to the intersection of the
-        // primary key and partition key ranges.
-        if (Bytes.memcmp(startPartitionKey, startPrimaryKey) < 0) {
-          this.nextPartitionKey = startPrimaryKey;
-        } else {
-          this.nextPartitionKey = startPartitionKey;
-        }
-        if (endPrimaryKey.length != 0 && Bytes.memcmp(endPartitionKey, endPrimaryKey) > 0) {
-          this.endPartitionKey = endPrimaryKey;
-        } else {
-          this.endPartitionKey = endPartitionKey;
-        }
-      }
-    } else {
-      this.nextPartitionKey = startPartitionKey;
-      this.endPartitionKey = endPartitionKey;
-    }
-
-    // Map the column names to actual columns in the table schema.
-    // If the user set this to 'null', we scan all columns.
-    if (projectedNames != null) {
-      List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
-      for (String columnName : projectedNames) {
-        ColumnSchema originalColumn = table.getSchema().getColumn(columnName);
-        columns.add(getStrippedColumnSchema(originalColumn));
-      }
-      this.schema = new Schema(columns);
-    } else if (projectedIndexes != null) {
-      List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
-      for (Integer columnIndex : projectedIndexes) {
-        ColumnSchema originalColumn = table.getSchema().getColumnByIndex(columnIndex);
-        columns.add(getStrippedColumnSchema(originalColumn));
-      }
-      this.schema = new Schema(columns);
-    } else {
-      this.schema = table.getSchema();
-    }
-
-    // If any of the column predicates are of type None (the predicate is known
-    // to match no rows), then the scan can be short circuited without
-    // contacting any tablet servers.
-    boolean shortCircuit = false;
-    for (KuduPredicate predicate : this.predicates.values()) {
-      if (predicate.getType() == KuduPredicate.PredicateType.NONE) {
-        shortCircuit = true;
-        break;
-      }
-    }
-    if (shortCircuit) {
-      LOG.debug("Short circuiting scan with predicates: {}", predicates.values());
-      this.hasMore = false;
-      this.closed = true;
-    }
-  }
-
-  /**
-   * Clone the given column schema instance. The new instance will include only the name, type, and
-   * nullability of the passed one.
-   * @return a new column schema
-   */
-  private static ColumnSchema getStrippedColumnSchema(ColumnSchema columnToClone) {
-    return new ColumnSchema.ColumnSchemaBuilder(columnToClone.getName(), columnToClone.getType())
-        .nullable(columnToClone.isNullable())
-        .build();
-  }
-
-  /**
-   * Returns the maximum number of rows that this scanner was configured to return.
-   * @return a long representing the maximum number of rows that can be returned
-   */
-  public long getLimit() {
-    return this.limit;
-  }
-
-  /**
-   * Tells if the last rpc returned that there might be more rows to scan.
-   * @return true if there might be more data to scan, else false
-   */
-  public boolean hasMoreRows() {
-    return this.hasMore;
-  }
-
-  /**
-   * Returns if this scanner was configured to cache data blocks or not.
-   * @return true if this scanner will cache blocks, else else.
-   */
-  public boolean getCacheBlocks() {
-    return this.cacheBlocks;
-  }
-
-  /**
-   * Returns the maximum number of bytes returned by the scanner, on each batch.
-   * @return a long representing the maximum number of bytes that a scanner can receive at once
-   * from a tablet server
-   */
-  public long getBatchSizeBytes() {
-    return this.batchSizeBytes;
-  }
-
-  /**
-   * Returns the ReadMode for this scanner.
-   * @return the configured read mode for this scanner
-   */
-  public ReadMode getReadMode() {
-    return this.readMode;
-  }
-
-  private Common.OrderMode getOrderMode() {
-    return this.orderMode;
-  }
-
-  /**
-   * Returns the projection schema of this scanner. If specific columns were
-   * not specified during scanner creation, the table schema is returned.
-   * @return the projection schema for this scanner
-   */
-  public Schema getProjectionSchema() {
-    return this.schema;
-  }
-
-  long getSnapshotTimestamp() {
-    return this.htTimestamp;
-  }
-
-  /**
-   * Scans a number of rows.
-   * <p>
-   * Once this method returns {@code null} once (which indicates that this
-   * {@code Scanner} is done scanning), calling it again leads to an undefined
-   * behavior.
-   * @return a deferred list of rows.
-   */
-  public Deferred<RowResultIterator> nextRows() {
-    if (closed) {  // We're already done scanning.
-      return Deferred.fromResult(null);
-    } else if (tablet == null) {
-
-      Callback<Deferred<RowResultIterator>, AsyncKuduScanner.Response> cb =
-          new Callback<Deferred<RowResultIterator>, Response>() {
-        @Override
-        public Deferred<RowResultIterator> call(Response resp) throws Exception {
-          if (!resp.more || resp.scanner_id == null) {
-            scanFinished();
-            return Deferred.fromResult(resp.data); // there might be data to return
-          }
-          scannerId = resp.scanner_id;
-          sequenceId++;
-          hasMore = resp.more;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Scanner " + Bytes.pretty(scannerId) + " opened on " + tablet);
-          }
-          return Deferred.fromResult(resp.data);
-        }
-        public String toString() {
-          return "scanner opened";
-        }
-      };
-
-      Callback<Deferred<RowResultIterator>, Exception> eb =
-          new Callback<Deferred<RowResultIterator>, Exception>() {
-        @Override
-        public Deferred<RowResultIterator> call(Exception e) throws Exception {
-          invalidate();
-          if (e instanceof NonCoveredRangeException) {
-            NonCoveredRangeException ncre = (NonCoveredRangeException) e;
-            nextPartitionKey = ncre.getNonCoveredRangeEnd();
-
-            // Stop scanning if the non-covered range is past the end partition key.
-            if (ncre.getNonCoveredRangeEnd().length == 0
-                || (endPartitionKey != AsyncKuduClient.EMPTY_ARRAY
-                && Bytes.memcmp(endPartitionKey, ncre.getNonCoveredRangeEnd()) <= 0)) {
-              hasMore = false;
-              closed = true; // the scanner is closed on the other side at this point
-              return Deferred.fromResult(RowResultIterator.empty());
-            }
-            nextPartitionKey = ncre.getNonCoveredRangeEnd();
-            scannerId = null;
-            sequenceId = 0;
-            return nextRows();
-          } else {
-            LOG.warn("Can not open scanner", e);
-            // Don't let the scanner think it's opened on this tablet.
-            return Deferred.fromError(e); // Let the error propogate.
-          }
-        }
-        public String toString() {
-          return "open scanner errback";
-        }
-      };
-
-      // We need to open the scanner first.
-      return client.sendRpcToTablet(getOpenRequest()).addCallbackDeferring(cb).addErrback(eb);
-    } else if (prefetching && prefetcherDeferred != null) {
-      // TODO KUDU-1260 - Check if this works and add a test
-      prefetcherDeferred.chain(new Deferred<RowResultIterator>().addCallback(prefetch));
-      return prefetcherDeferred;
-    }
-    final Deferred<RowResultIterator> d =
-        client.scanNextRows(this).addCallbacks(got_next_row, nextRowErrback());
-    if (prefetching) {
-      d.chain(new Deferred<RowResultIterator>().addCallback(prefetch));
-    }
-    return d;
-  }
-
-  private final Callback<RowResultIterator, RowResultIterator> prefetch =
-      new Callback<RowResultIterator, RowResultIterator>() {
-    @Override
-    public RowResultIterator call(RowResultIterator arg) throws Exception {
-      if (hasMoreRows()) {
-        prefetcherDeferred = client.scanNextRows(AsyncKuduScanner.this).addCallbacks
-            (got_next_row, nextRowErrback());
-      }
-      return null;
-    }
-  };
-
-  /**
-   * Singleton callback to handle responses of "next" RPCs.
-   * This returns an {@code ArrayList<ArrayList<KeyValue>>} (possibly inside a
-   * deferred one).
-   */
-  private final Callback<RowResultIterator, Response> got_next_row =
-      new Callback<RowResultIterator, Response>() {
-        public RowResultIterator call(final Response resp) {
-          if (!resp.more) {  // We're done scanning this tablet.
-            scanFinished();
-            return resp.data;
-          }
-          sequenceId++;
-          hasMore = resp.more;
-          //LOG.info("Scan.next is returning rows: " + resp.data.getNumRows());
-          return resp.data;
-        }
-        public String toString() {
-          return "get nextRows response";
-        }
-      };
-
-  /**
-   * Creates a new errback to handle errors while trying to get more rows.
-   */
-  private final Callback<Exception, Exception> nextRowErrback() {
-    return new Callback<Exception, Exception>() {
-      public Exception call(final Exception error) {
-        final AsyncKuduClient.RemoteTablet old_tablet = tablet;  // Save before invalidate().
-        String message = old_tablet + " pretends to not know " + AsyncKuduScanner.this;
-        LOG.warn(message, error);
-        invalidate();  // If there was an error, don't assume we're still OK.
-        return error;  // Let the error propagate.
-      }
-      public String toString() {
-        return "NextRow errback";
-      }
-    };
-  }
-
-  void scanFinished() {
-    Partition partition = tablet.getPartition();
-    // Stop scanning if we have scanned until or past the end partition key.
-    if (partition.isEndPartition()
-        || (this.endPartitionKey != AsyncKuduClient.EMPTY_ARRAY
-            && Bytes.memcmp(this.endPartitionKey, partition.getPartitionKeyEnd()) <= 0)) {
-      hasMore = false;
-      closed = true; // the scanner is closed on the other side at this point
-      return;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Done scanning tablet {} for partition {} with scanner id {}",
-                tablet.getTabletIdAsString(), tablet.getPartition(), Bytes.pretty(scannerId));
-    }
-    nextPartitionKey = partition.getPartitionKeyEnd();
-    scannerId = null;
-    sequenceId = 0;
-    invalidate();
-  }
-
-  /**
-   * Closes this scanner (don't forget to call this when you're done with it!).
-   * <p>
-   * Closing a scanner already closed has no effect.  The deferred returned
-   * will be called back immediately.
-   * @return A deferred object that indicates the completion of the request.
-   * The {@link Object} can be null, a RowResultIterator if there was data left
-   * in the scanner, or an Exception.
-   */
-  public Deferred<RowResultIterator> close() {
-    if (closed) {
-      return Deferred.fromResult(null);
-    }
-    final Deferred<RowResultIterator> d =
-       client.closeScanner(this).addCallback(closedCallback()); // TODO errBack ?
-    return d;
-  }
-
-  /** Callback+Errback invoked when the TabletServer closed our scanner.  */
-  private Callback<RowResultIterator, Response> closedCallback() {
-    return new Callback<RowResultIterator, Response>() {
-      public RowResultIterator call(Response response) {
-        closed = true;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Scanner " + Bytes.pretty(scannerId) + " closed on "
-              + tablet);
-        }
-        tablet = null;
-        scannerId = "client debug closed".getBytes();   // Make debugging easier.
-        return response == null ? null : response.data;
-      }
-      public String toString() {
-        return "scanner closed";
-      }
-    };
-  }
-
-  public String toString() {
-    final String tablet = this.tablet == null ? "null" : this.tablet.getTabletIdAsString();
-    final StringBuilder buf = new StringBuilder();
-    buf.append("KuduScanner(table=");
-    buf.append(table.getName());
-    buf.append(", tablet=").append(tablet);
-    buf.append(", scannerId=").append(Bytes.pretty(scannerId));
-    buf.append(", scanRequestTimeout=").append(scanRequestTimeout);
-    buf.append(')');
-    return buf.toString();
-  }
-
-  // ---------------------- //
-  // Package private stuff. //
-  // ---------------------- //
-
-  KuduTable table() {
-    return table;
-  }
-
-  /**
-   * Sets the name of the tabletSlice that's hosting {@code this.start_key}.
-   * @param tablet The tabletSlice we're currently supposed to be scanning.
-   */
-  void setTablet(final AsyncKuduClient.RemoteTablet tablet) {
-    this.tablet = tablet;
-  }
-
-  /**
-   * Invalidates this scanner and makes it assume it's no longer opened.
-   * When a TabletServer goes away while we're scanning it, or some other type
-   * of access problem happens, this method should be called so that the
-   * scanner will have to re-locate the TabletServer and re-open itself.
-   */
-  void invalidate() {
-    tablet = null;
-  }
-
-  /**
-   * Returns the tabletSlice currently being scanned, if any.
-   */
-  AsyncKuduClient.RemoteTablet currentTablet() {
-    return tablet;
-  }
-
-  /**
-   * Returns an RPC to open this scanner.
-   */
-  KuduRpc<Response> getOpenRequest() {
-    checkScanningNotStarted();
-    // This is the only point where we know we haven't started scanning and where the scanner
-    // should be fully configured
-    if (this.inFirstTablet) {
-      this.inFirstTablet = false;
-    }
-    return new ScanRequest(table, State.OPENING);
-  }
-
-  /**
-   * Returns an RPC to fetch the next rows.
-   */
-  KuduRpc<Response> getNextRowsRequest() {
-    return new ScanRequest(table, State.NEXT);
-  }
-
-  /**
-   * Returns an RPC to close this scanner.
-   */
-  KuduRpc<Response> getCloseRequest() {
-    return new ScanRequest(table, State.CLOSING);
-  }
-
-  /**
-   * Throws an exception if scanning already started.
-   * @throws IllegalStateException if scanning already started.
-   */
-  private void checkScanningNotStarted() {
-    if (tablet != null) {
-      throw new IllegalStateException("scanning already started");
-    }
-  }
-
-  /**
-   *  Helper object that contains all the info sent by a TS after a Scan request.
-   */
-  static final class Response {
-    /** The ID associated with the scanner that issued the request.  */
-    private final byte[] scanner_id;
-    /** The actual payload of the response.  */
-    private final RowResultIterator data;
-
-    /**
-     * If false, the filter we use decided there was no more data to scan.
-     * In this case, the server has automatically closed the scanner for us,
-     * so we don't need to explicitly close it.
-     */
-    private final boolean more;
-
-    Response(final byte[] scanner_id,
-             final RowResultIterator data,
-             final boolean more) {
-      this.scanner_id = scanner_id;
-      this.data = data;
-      this.more = more;
-    }
-
-    public String toString() {
-      return "AsyncKuduScanner$Response(scannerId=" + Bytes.pretty(scanner_id)
-          + ", data=" + data + ", more=" + more +  ") ";
-    }
-  }
-
-  private enum State {
-    OPENING,
-    NEXT,
-    CLOSING
-  }
-
-  /**
-   * RPC sent out to fetch the next rows from the TabletServer.
-   */
-  private final class ScanRequest extends KuduRpc<Response> {
-
-    State state;
-
-    ScanRequest(KuduTable table, State state) {
-      super(table);
-      this.state = state;
-      this.setTimeoutMillis(scanRequestTimeout);
-    }
-
-    @Override
-    String serviceName() { return TABLET_SERVER_SERVICE_NAME; }
-
-    @Override
-    String method() {
-      return "Scan";
-    }
-
-    @Override
-    Collection<Integer> getRequiredFeatures() {
-      if (predicates.isEmpty()) {
-        return ImmutableList.of();
-      } else {
-        return ImmutableList.of(Tserver.TabletServerFeatures.COLUMN_PREDICATES_VALUE);
-      }
-    }
-
-    /** Serializes this request.  */
-    ChannelBuffer serialize(Message header) {
-      final ScanRequestPB.Builder builder = ScanRequestPB.newBuilder();
-      switch (state) {
-        case OPENING:
-          // Save the tablet in the AsyncKuduScanner.  This kind of a kludge but it really
-          // is the easiest way.
-          AsyncKuduScanner.this.tablet = super.getTablet();
-          NewScanRequestPB.Builder newBuilder = NewScanRequestPB.newBuilder();
-          newBuilder.setLimit(limit); // currently ignored
-          newBuilder.addAllProjectedColumns(ProtobufHelper.schemaToListPb(schema));
-          newBuilder.setTabletId(ZeroCopyLiteralByteString.wrap(tablet.getTabletIdAsBytes()));
-          newBuilder.setReadMode(AsyncKuduScanner.this.getReadMode().pbVersion());
-          newBuilder.setOrderMode(AsyncKuduScanner.this.getOrderMode());
-          newBuilder.setCacheBlocks(cacheBlocks);
-          // if the last propagated timestamp is set send it with the scan
-          if (table.getAsyncClient().getLastPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP) {
-            newBuilder.setPropagatedTimestamp(table.getAsyncClient().getLastPropagatedTimestamp());
-          }
-          newBuilder.setReadMode(AsyncKuduScanner.this.getReadMode().pbVersion());
-
-          // if the mode is set to read on snapshot sent the snapshot timestamp
-          if (AsyncKuduScanner.this.getReadMode() == ReadMode.READ_AT_SNAPSHOT &&
-            AsyncKuduScanner.this.getSnapshotTimestamp() != AsyncKuduClient.NO_TIMESTAMP) {
-            newBuilder.setSnapTimestamp(AsyncKuduScanner.this.getSnapshotTimestamp());
-          }
-
-          if (AsyncKuduScanner.this.startPrimaryKey != AsyncKuduClient.EMPTY_ARRAY &&
-              AsyncKuduScanner.this.startPrimaryKey.length > 0) {
-            newBuilder.setStartPrimaryKey(ZeroCopyLiteralByteString.copyFrom(startPrimaryKey));
-          }
-
-          if (AsyncKuduScanner.this.endPrimaryKey != AsyncKuduClient.EMPTY_ARRAY &&
-              AsyncKuduScanner.this.endPrimaryKey.length > 0) {
-            newBuilder.setStopPrimaryKey(ZeroCopyLiteralByteString.copyFrom(endPrimaryKey));
-          }
-
-          for (KuduPredicate pred : predicates.values()) {
-            newBuilder.addColumnPredicates(pred.toPB());
-          }
-          builder.setNewScanRequest(newBuilder.build())
-                 .setBatchSizeBytes(batchSizeBytes);
-          break;
-        case NEXT:
-          setTablet(AsyncKuduScanner.this.tablet);
-          builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
-                 .setCallSeqId(AsyncKuduScanner.this.sequenceId)
-                 .setBatchSizeBytes(batchSizeBytes);
-          break;
-        case CLOSING:
-          setTablet(AsyncKuduScanner.this.tablet);
-          builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
-                 .setBatchSizeBytes(0)
-                 .setCloseScanner(true);
-      }
-
-      ScanRequestPB request = builder.build();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Sending scan req: " + request.toString());
-      }
-
-      return toChannelBuffer(header, request);
-    }
-
-    @Override
-    Pair<Response, Object> deserialize(final CallResponse callResponse,
-                                       String tsUUID) throws Exception {
-      ScanResponsePB.Builder builder = ScanResponsePB.newBuilder();
-      readProtobuf(callResponse.getPBMessage(), builder);
-      ScanResponsePB resp = builder.build();
-      final byte[] id = resp.getScannerId().toByteArray();
-      TabletServerErrorPB error = resp.hasError() ? resp.getError() : null;
-
-      if (error != null && error.getCode().equals(TabletServerErrorPB.Code.TABLET_NOT_FOUND)) {
-        if (state == State.OPENING) {
-          // Doing this will trigger finding the new location.
-          return new Pair<Response, Object>(null, error);
-        } else {
-          Status statusIncomplete = Status.Incomplete("Cannot continue scanning, " +
-              "the tablet has moved and this isn't a fault tolerant scan");
-          throw new NonRecoverableException(statusIncomplete);
-        }
-      }
-      RowResultIterator iterator = RowResultIterator.makeRowResultIterator(
-          deadlineTracker.getElapsedMillis(), tsUUID, schema, resp.getData(),
-          callResponse);
-
-      boolean hasMore = resp.getHasMoreResults();
-      if (id.length  != 0 && scannerId != null && !Bytes.equals(scannerId, id)) {
-        Status statusIllegalState = Status.IllegalState("Scan RPC response was for scanner"
-            + " ID " + Bytes.pretty(id) + " but we expected "
-            + Bytes.pretty(scannerId));
-        throw new NonRecoverableException(statusIllegalState);
-      }
-      Response response = new Response(id, iterator, hasMore);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(response.toString());
-      }
-      return new Pair<Response, Object>(response, error);
-    }
-
-    public String toString() {
-      return "ScanRequest(scannerId=" + Bytes.pretty(scannerId)
-          + (tablet != null? ", tabletSlice=" + tablet.getTabletIdAsString() : "")
-          + ", attempt=" + attempt + ')';
-    }
-
-    @Override
-    public byte[] partitionKey() {
-      // This key is used to lookup where the request needs to go
-      return nextPartitionKey;
-    }
-  }
-
-  /**
-   * A Builder class to build {@link AsyncKuduScanner}.
-   * Use {@link AsyncKuduClient#newScannerBuilder} in order to get a builder instance.
-   */
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class AsyncKuduScannerBuilder
-      extends AbstractKuduScannerBuilder<AsyncKuduScannerBuilder, AsyncKuduScanner> {
-
-    AsyncKuduScannerBuilder(AsyncKuduClient client, KuduTable table) {
-      super(client, table);
-    }
-
-    /**
-     * Builds an {@link AsyncKuduScanner} using the passed configurations.
-     * @return a new {@link AsyncKuduScanner}
-     */
-    public AsyncKuduScanner build() {
-      return new AsyncKuduScanner(
-          client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
-          scanRequestTimeout, predicates, limit, cacheBlocks,
-          prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
-          lowerBoundPartitionKey, upperBoundPartitionKey,
-          htTimestamp, batchSizeBytes);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
deleted file mode 100644
index 0290ee7..0000000
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
+++ /dev/null
@@ -1,856 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Range;
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.util.AsyncUtil;
-import org.kududb.util.Slice;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.NotThreadSafe;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.kududb.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
-
-/**
- * A AsyncKuduSession belongs to a specific AsyncKuduClient, and represents a context in
- * which all read/write data access should take place. Within a session,
- * multiple operations may be accumulated and batched together for better
- * efficiency. Settings like timeouts, priorities, and trace IDs are also set
- * per session.<p>
- *
- * AsyncKuduSession is separate from KuduTable because a given batch or transaction
- * may span multiple tables. This is particularly important in the future when
- * we add ACID support, but even in the context of batching, we may be able to
- * coalesce writes to different tables hosted on the same server into the same
- * RPC.<p>
- *
- * AsyncKuduSession is separate from AsyncKuduClient because, in a multi-threaded
- * application, different threads may need to concurrently execute
- * transactions. Similar to a JDBC "session", transaction boundaries will be
- * delineated on a per-session basis -- in between a "BeginTransaction" and
- * "Commit" call on a given session, all operations will be part of the same
- * transaction. Meanwhile another concurrent Session object can safely run
- * non-transactional work or other transactions without interfering.<p>
- *
- * Therefore, this class is <b>not</b> thread-safe.<p>
- *
- * Additionally, there is a guarantee that writes from different sessions do not
- * get batched together into the same RPCs -- this means that latency-sensitive
- * clients can run through the same AsyncKuduClient object as throughput-oriented
- * clients, perhaps by setting the latency-sensitive session's timeouts low and
- * priorities high. Without the separation of batches, a latency-sensitive
- * single-row insert might get batched along with 10MB worth of inserts from the
- * batch writer, thus delaying the response significantly.<p>
- *
- * Though we currently do not have transactional support, users will be forced
- * to use a AsyncKuduSession to instantiate reads as well as writes.  This will make
- * it more straight-forward to add RW transactions in the future without
- * significant modifications to the API.<p>
- *
- * Timeouts are handled differently depending on the flush mode.
- * With AUTO_FLUSH_SYNC, the timeout is set on each apply()'d operation.
- * With AUTO_FLUSH_BACKGROUND and MANUAL_FLUSH, the timeout is assigned to a whole batch of
- * operations upon flush()'ing. It means that in a situation with a timeout of 500ms and a flush
- * interval of 1000ms, an operation can be outstanding for up to 1500ms before being timed out.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-@NotThreadSafe
-public class AsyncKuduSession implements SessionConfiguration {
-
-  public static final Logger LOG = LoggerFactory.getLogger(AsyncKuduSession.class);
-  private static final Range<Float> PERCENTAGE_RANGE = Range.closed(0.0f, 1.0f);
-
-  private final AsyncKuduClient client;
-  private final Random randomizer = new Random();
-  private final ErrorCollector errorCollector;
-  private int interval = 1000;
-  private int mutationBufferSpace = 1000; // TODO express this in terms of data size.
-  private float mutationBufferLowWatermarkPercentage = 0.5f;
-  private int mutationBufferLowWatermark;
-  private FlushMode flushMode;
-  private ExternalConsistencyMode consistencyMode;
-  private long timeoutMs;
-
-  /**
-   * Protects internal state from concurrent access. {@code AsyncKuduSession} is not threadsafe
-   * from the application's perspective, but because internally async timers and async flushing
-   * tasks may access the session concurrently with the application, synchronization is still
-   * needed.
-   */
-  private final Object monitor = new Object();
-
-  /**
-   * Tracks the currently active buffer.
-   *
-   * When in mode {@link FlushMode#AUTO_FLUSH_BACKGROUND} or {@link FlushMode#AUTO_FLUSH_SYNC},
-   * {@code AsyncKuduSession} uses double buffering to improve write throughput. While the
-   * application is {@link #apply}ing operations to one buffer (the {@code activeBuffer}), the
-   * second buffer is either being flushed, or if it has already been flushed, it waits in the
-   * {@link #inactiveBuffers} queue. When the currently active buffer is flushed,
-   * {@code activeBuffer} is set to {@code null}. On the next call to {@code apply}, an inactive
-   * buffer is taken from {@code inactiveBuffers} and made the new active buffer. If both
-   * buffers are still flushing, then the {@code apply} call throws {@link PleaseThrottleException}.
-   */
-  @GuardedBy("monitor")
-  private Buffer activeBuffer;
-
-  /**
-   * The buffers. May either be active (pointed to by {@link #activeBuffer},
-   * inactive (in the {@link #inactiveBuffers}) queue, or flushing.
-   */
-  private final Buffer bufferA = new Buffer();
-  private final Buffer bufferB = new Buffer();
-
-  /**
-   * Queue containing flushed, inactive buffers. May be accessed from callbacks (I/O threads).
-   * We restrict the session to only two buffers, so {@link BlockingQueue#add} can
-   * be used without chance of failure.
-   */
-  private final BlockingQueue<Buffer> inactiveBuffers = new ArrayBlockingQueue<>(2, false);
-
-  /**
-   * Deferred used to notify on flush events. Atomically swapped and completed every time a buffer
-   * is flushed. This can be used to notify handlers of {@link PleaseThrottleException} that more
-   * capacity may be available in the active buffer.
-   */
-  private final AtomicReference<Deferred<Void>> flushNotification =
-      new AtomicReference<>(new Deferred<Void>());
-
-  /**
-   * Tracks whether the session has been closed.
-   */
-  private volatile boolean closed = false;
-
-  private boolean ignoreAllDuplicateRows = false;
-
-  /**
-   * Package-private constructor meant to be used via AsyncKuduClient
-   * @param client client that creates this session
-   */
-  AsyncKuduSession(AsyncKuduClient client) {
-    this.client = client;
-    flushMode = FlushMode.AUTO_FLUSH_SYNC;
-    consistencyMode = CLIENT_PROPAGATED;
-    timeoutMs = client.getDefaultOperationTimeoutMs();
-    inactiveBuffers.add(bufferA);
-    inactiveBuffers.add(bufferB);
-    errorCollector = new ErrorCollector(mutationBufferSpace);
-    setMutationBufferLowWatermark(this.mutationBufferLowWatermarkPercentage);
-  }
-
-  @Override
-  public FlushMode getFlushMode() {
-    return this.flushMode;
-  }
-
-  @Override
-  public void setFlushMode(FlushMode flushMode) {
-    if (hasPendingOperations()) {
-      throw new IllegalArgumentException("Cannot change flush mode when writes are buffered");
-    }
-    this.flushMode = flushMode;
-  }
-
-  @Override
-  public void setExternalConsistencyMode(ExternalConsistencyMode consistencyMode) {
-    if (hasPendingOperations()) {
-      throw new IllegalArgumentException("Cannot change consistency mode "
-          + "when writes are buffered");
-    }
-    this.consistencyMode = consistencyMode;
-  }
-
-  @Override
-  public void setMutationBufferSpace(int size) {
-    if (hasPendingOperations()) {
-      throw new IllegalArgumentException("Cannot change the buffer" +
-          " size when operations are buffered");
-    }
-    this.mutationBufferSpace = size;
-    // Reset the low watermark, using the same percentage as before.
-    setMutationBufferLowWatermark(mutationBufferLowWatermarkPercentage);
-  }
-
-  @Override
-  public void setMutationBufferLowWatermark(float mutationBufferLowWatermarkPercentage) {
-    if (hasPendingOperations()) {
-      throw new IllegalArgumentException("Cannot change the buffer" +
-          " low watermark when operations are buffered");
-    } else if (!PERCENTAGE_RANGE.contains(mutationBufferLowWatermarkPercentage)) {
-      throw new IllegalArgumentException("The low watermark must be between 0 and 1 inclusively");
-    }
-    this.mutationBufferLowWatermarkPercentage = mutationBufferLowWatermarkPercentage;
-    this.mutationBufferLowWatermark =
-        (int)(this.mutationBufferLowWatermarkPercentage * mutationBufferSpace);
-  }
-
-  /**
-   * Lets us set a specific seed for tests
-   * @param seed
-   */
-  @VisibleForTesting
-  void setRandomSeed(long seed) {
-    this.randomizer.setSeed(seed);
-  }
-
-  @Override
-  public void setFlushInterval(int interval) {
-    this.interval = interval;
-  }
-
-  @Override
-  public void setTimeoutMillis(long timeout) {
-    this.timeoutMs = timeout;
-  }
-
-  @Override
-  public long getTimeoutMillis() {
-    return this.timeoutMs;
-  }
-
-  @Override
-  public boolean isClosed() {
-    return closed;
-  }
-
-  @Override
-  public boolean isIgnoreAllDuplicateRows() {
-    return ignoreAllDuplicateRows;
-  }
-
-  @Override
-  public void setIgnoreAllDuplicateRows(boolean ignoreAllDuplicateRows) {
-    this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
-  }
-
-  @Override
-  public int countPendingErrors() {
-    return errorCollector.countErrors();
-  }
-
-  @Override
-  public RowErrorsAndOverflowStatus getPendingErrors() {
-    return errorCollector.getErrors();
-  }
-
-  /**
-   * Flushes the buffered operations and marks this session as closed.
-   * See the javadoc on {@link #flush()} on how to deal with exceptions coming out of this method.
-   * @return a Deferred whose callback chain will be invoked when.
-   * everything that was buffered at the time of the call has been flushed.
-   */
-  public Deferred<List<OperationResponse>> close() {
-    if (!closed) {
-      closed = true;
-      client.removeSession(this);
-    }
-    return flush();
-  }
-
-  /**
-   * Returns a buffer to the inactive queue after flushing.
-   * @param buffer the buffer to return to the inactive queue.
-   */
-  private void queueBuffer(Buffer buffer) {
-    buffer.callbackFlushNotification();
-    Deferred<Void> localFlushNotification = flushNotification.getAndSet(new Deferred<Void>());
-    inactiveBuffers.add(buffer);
-    localFlushNotification.callback(null);
-  }
-
-  /**
-   * Callback which waits for all tablet location lookups to complete, groups all operations into
-   * batches by tablet, and dispatches them. When all of the batches are complete, a deferred is
-   * fired and the buffer is added to the inactive queue.
-   */
-  private final class TabletLookupCB implements Callback<Void, Object> {
-    private final AtomicInteger lookupsOutstanding;
-    private final Buffer buffer;
-    private final Deferred<List<BatchResponse>> deferred;
-
-    public TabletLookupCB(Buffer buffer, Deferred<List<BatchResponse>> deferred) {
-      this.lookupsOutstanding = new AtomicInteger(buffer.getOperations().size());
-      this.buffer = buffer;
-      this.deferred = deferred;
-    }
-
-    @Override
-    public Void call(Object _void) throws Exception {
-      if (lookupsOutstanding.decrementAndGet() != 0) return null;
-
-      // The final tablet lookup is complete. Batch all of the buffered
-      // operations into their respective tablet, and then send the batches.
-
-      // Group the operations by tablet.
-      Map<Slice, Batch> batches = new HashMap<>();
-      List<OperationResponse> opsFailedInLookup = new ArrayList<>();
-
-      for (BufferedOperation bufferedOp : buffer.getOperations()) {
-        Operation operation = bufferedOp.getOperation();
-        if (bufferedOp.tabletLookupFailed()) {
-          Exception failure = bufferedOp.getTabletLookupFailure();
-          RowError error;
-          if (failure instanceof NonCoveredRangeException) {
-            // TODO: this should be something different than NotFound so that
-            // applications can distinguish from updates on missing rows.
-            error = new RowError(Status.NotFound(failure.getMessage()), operation);
-          } else {
-            LOG.warn("unexpected tablet lookup failure for operation {}", operation, failure);
-            error = new RowError(Status.RuntimeError(failure.getMessage()), operation);
-          }
-          OperationResponse response = new OperationResponse(0, null, 0, operation, error);
-          // Add the row error to the error collector if the session is in background flush mode,
-          // and complete the operation's deferred with the error response. The ordering between
-          // adding to the error collector and completing the deferred should not matter since
-          // applications should be using one or the other method for error handling, not both.
-          if (flushMode == FlushMode.AUTO_FLUSH_BACKGROUND) {
-            errorCollector.addError(error);
-          }
-          operation.callback(response);
-          opsFailedInLookup.add(response);
-          continue;
-        }
-        LocatedTablet tablet = bufferedOp.getTablet();
-        Slice tabletId = new Slice(tablet.getTabletId());
-
-        Batch batch = batches.get(tabletId);
-        if (batch == null) {
-          batch = new Batch(operation.getTable(), tablet, ignoreAllDuplicateRows);
-          batches.put(tabletId, batch);
-        }
-        batch.add(operation);
-      }
-
-      List<Deferred<BatchResponse>> batchResponses = new ArrayList<>(batches.size() + 1);
-      if (!opsFailedInLookup.isEmpty()) {
-        batchResponses.add(Deferred.fromResult(new BatchResponse(opsFailedInLookup)));
-      }
-
-      for (Batch batch : batches.values()) {
-        if (timeoutMs != 0) {
-          batch.deadlineTracker.reset();
-          batch.setTimeoutMillis(timeoutMs);
-        }
-        addBatchCallbacks(batch);
-        batchResponses.add(client.sendRpcToTablet(batch));
-      }
-
-      // On completion of all batches, fire the completion deferred, and add the buffer
-      // back to the inactive buffers queue. This frees it up for new inserts.
-      AsyncUtil.addBoth(
-          Deferred.group(batchResponses),
-          new Callback<Void, Object>() {
-            @Override
-            public Void call(Object responses) {
-              queueBuffer(buffer);
-              deferred.callback(responses);
-              return null;
-            }
-          });
-
-      return null;
-    }
-  }
-
-  /**
-   * Flush buffered writes.
-   * @return a {@link Deferred} whose callback chain will be invoked when all applied operations at
-   *         the time of the call have been flushed.
-   */
-  public Deferred<List<OperationResponse>> flush() {
-    Buffer buffer;
-    Deferred<Void> nonActiveBufferFlush;
-    synchronized (monitor) {
-      nonActiveBufferFlush = getNonActiveFlushNotification();
-      buffer = activeBuffer;
-      activeBuffer = null;
-    }
-
-    final Deferred<List<OperationResponse>> activeBufferFlush = buffer == null ?
-        Deferred.<List<OperationResponse>>fromResult(ImmutableList.<OperationResponse>of()) :
-        doFlush(buffer);
-
-    return AsyncUtil.addBothDeferring(nonActiveBufferFlush,
-                                      new Callback<Deferred<List<OperationResponse>>, Object>() {
-                                        @Override
-                                        public Deferred<List<OperationResponse>> call(Object arg) {
-                                          return activeBufferFlush;
-                                        }
-                                      });
-  }
-
-  /**
-   * Flushes a write buffer. This method takes ownership of the buffer, no other concurrent access
-   * is allowed.
-   *
-   * @param buffer the buffer to flush, must not be modified once passed to this method
-   * @return the operation responses
-   */
-  private Deferred<List<OperationResponse>> doFlush(Buffer buffer) {
-    LOG.debug("flushing buffer: {}", buffer);
-    if (buffer.getOperations().isEmpty()) {
-      // no-op.
-      return Deferred.<List<OperationResponse>>fromResult(ImmutableList.<OperationResponse>of());
-    }
-
-    Deferred<List<BatchResponse>> batchResponses = new Deferred<>();
-    Callback<Void, Object> tabletLookupCB = new TabletLookupCB(buffer, batchResponses);
-
-    for (BufferedOperation bufferedOperation : buffer.getOperations()) {
-      AsyncUtil.addBoth(bufferedOperation.getTabletLookup(), tabletLookupCB);
-    }
-
-    return batchResponses.addCallback(ConvertBatchToListOfResponsesCB.getInstance());
-  }
-
-  /**
-   * Callback used to send a list of OperationResponse instead of BatchResponse since the
-   * latter is an implementation detail.
-   */
-  private static class ConvertBatchToListOfResponsesCB implements Callback<List<OperationResponse>,
-                                                                           List<BatchResponse>> {
-    private static final ConvertBatchToListOfResponsesCB INSTANCE =
-        new ConvertBatchToListOfResponsesCB();
-    @Override
-    public List<OperationResponse> call(List<BatchResponse> batchResponses) throws Exception {
-      // First compute the size of the union of all the lists so that we don't trigger expensive
-      // list growths while adding responses to it.
-      int size = 0;
-      for (BatchResponse batchResponse : batchResponses) {
-        size += batchResponse.getIndividualResponses().size();
-      }
-
-      ArrayList<OperationResponse> responses = new ArrayList<>(size);
-      for (BatchResponse batchResponse : batchResponses) {
-        responses.addAll(batchResponse.getIndividualResponses());
-      }
-
-      return responses;
-    }
-    @Override
-    public String toString() {
-      return "ConvertBatchToListOfResponsesCB";
-    }
-    public static ConvertBatchToListOfResponsesCB getInstance() {
-      return INSTANCE;
-    }
-  }
-
-  @Override
-  public boolean hasPendingOperations() {
-    synchronized (monitor) {
-      return activeBuffer == null ? inactiveBuffers.size() < 2 :
-             activeBuffer.getOperations().size() > 0 || !inactiveBufferAvailable();
-    }
-  }
-
-  /**
-   * Apply the given operation.
-   * The behavior of this function depends on the current flush mode. Regardless
-   * of flush mode, however, Apply may begin to perform processing in the background
-   * for the call (e.g looking up the tablet, etc).
-   * @param operation operation to apply
-   * @return a Deferred to track this operation
-   * @throws KuduException if an error happens or {@link PleaseThrottleException} is triggered
-   */
-  public Deferred<OperationResponse> apply(final Operation operation) throws KuduException {
-    Preconditions.checkNotNull(operation, "Can not apply a null operation");
-
-    // Freeze the row so that the client can not concurrently modify it while it is in flight.
-    operation.getRow().freeze();
-
-    // If immediate flush mode, send the operation directly.
-    if (flushMode == FlushMode.AUTO_FLUSH_SYNC) {
-      if (timeoutMs != 0) {
-        operation.setTimeoutMillis(timeoutMs);
-      }
-      operation.setExternalConsistencyMode(this.consistencyMode);
-      operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
-      return client.sendRpcToTablet(operation);
-    }
-
-    // Kick off a location lookup.
-    Deferred<LocatedTablet> tablet = client.getTabletLocation(operation.getTable(),
-                                                              operation.partitionKey(),
-                                                              timeoutMs);
-
-    // Holds a buffer that should be flushed outside the synchronized block, if necessary.
-    Buffer fullBuffer = null;
-    try {
-      synchronized (monitor) {
-        if (activeBuffer == null) {
-          // If the active buffer is null then we recently flushed. Check if there
-          // is an inactive buffer available to replace as the active.
-          if (inactiveBufferAvailable()) {
-            refreshActiveBuffer();
-          } else {
-            Status statusServiceUnavailable =
-                Status.ServiceUnavailable("All buffers are currently flushing");
-            // This can happen if the user writes into a buffer, flushes it, writes
-            // into the second, flushes it, and immediately tries to write again.
-            throw new PleaseThrottleException(statusServiceUnavailable,
-                                              null, operation, flushNotification.get());
-          }
-        }
-
-        if (flushMode == FlushMode.MANUAL_FLUSH) {
-          if (activeBuffer.getOperations().size() < mutationBufferSpace) {
-            activeBuffer.getOperations().add(new BufferedOperation(tablet, operation));
-          } else {
-            Status statusIllegalState =
-                Status.IllegalState("MANUAL_FLUSH is enabled but the buffer is too big");
-            throw new NonRecoverableException(statusIllegalState);
-          }
-        } else {
-          assert flushMode == FlushMode.AUTO_FLUSH_BACKGROUND;
-          int activeBufferSize = activeBuffer.getOperations().size();
-
-          if (activeBufferSize >= mutationBufferSpace) {
-            // Save the active buffer into fullBuffer so that it gets flushed when we leave this
-            // synchronized block.
-            fullBuffer = activeBuffer;
-            activeBuffer = null;
-            activeBufferSize = 0;
-            if (inactiveBufferAvailable()) {
-              refreshActiveBuffer();
-            } else {
-              Status statusServiceUnavailable =
-                  Status.ServiceUnavailable("All buffers are currently flushing");
-              throw new PleaseThrottleException(statusServiceUnavailable,
-                                                null, operation, flushNotification.get());
-            }
-          }
-
-          if (mutationBufferLowWatermark < mutationBufferSpace && // low watermark is enabled
-              activeBufferSize >= mutationBufferLowWatermark &&   // buffer is over low water mark
-              !inactiveBufferAvailable()) {                       // no inactive buffers
-
-            // Check if we are over the low water mark.
-            int randomWatermark = activeBufferSize + 1 +
-                                  randomizer.nextInt(mutationBufferSpace -
-                                                     mutationBufferLowWatermark);
-
-            if (randomWatermark > mutationBufferSpace) {
-              Status statusServiceUnavailable =
-                  Status.ServiceUnavailable("The previous buffer hasn't been flushed and the " +
-                      "current buffer is over the low watermark, please retry later");
-              throw new PleaseThrottleException(statusServiceUnavailable,
-                                                null, operation, flushNotification.get());
-            }
-          }
-
-          activeBuffer.getOperations().add(new BufferedOperation(tablet, operation));
-
-          if (activeBufferSize + 1 >= mutationBufferSpace && inactiveBufferAvailable()) {
-            // If the operation filled the buffer, then flush it.
-            Preconditions.checkState(fullBuffer == null);
-            fullBuffer = activeBuffer;
-            activeBuffer = null;
-            activeBufferSize = 0;
-          } else if (activeBufferSize == 0) {
-            // If this is the first operation in the buffer, start a background flush timer.
-            client.newTimeout(activeBuffer.getFlusherTask(), interval);
-          }
-        }
-      }
-    } finally {
-      // Flush the buffer outside of the synchronized block, if required.
-      if (fullBuffer != null) {
-        doFlush(fullBuffer);
-      }
-    }
-    return operation.getDeferred();
-  }
-
-  /**
-   * Returns {@code true} if there is an inactive buffer available.
-   * @return true if there is currently an inactive buffer available
-   */
-  private boolean inactiveBufferAvailable() {
-    return inactiveBuffers.peek() != null;
-  }
-
-  /**
-   * Refreshes the active buffer. This should only be called after a
-   * {@link #flush()} when the active buffer is {@code null}, there is an
-   * inactive buffer available (see {@link #inactiveBufferAvailable()}, and
-   * {@link #monitor} is locked.
-   */
-  @GuardedBy("monitor")
-  private void refreshActiveBuffer() {
-    Preconditions.checkState(activeBuffer == null);
-    activeBuffer = inactiveBuffers.remove();
-    activeBuffer.reset();
-  }
-
-  /**
-   * Returns a flush notification for the currently non-active buffers.
-   * This is used during manual {@link #flush} calls to ensure that all buffers (not just the active
-   * buffer) are fully flushed before completing.
-   */
-  @GuardedBy("monitor")
-  private Deferred<Void> getNonActiveFlushNotification() {
-    final Deferred<Void> notificationA = bufferA.getFlushNotification();
-    final Deferred<Void> notificationB = bufferB.getFlushNotification();
-    if (activeBuffer == null) {
-      // Both buffers are either flushing or inactive.
-      return AsyncUtil.addBothDeferring(notificationA, new Callback<Deferred<Void>, Object>() {
-        @Override
-        public Deferred<Void> call(Object _obj) throws Exception {
-          return notificationB;
-        }
-      });
-    } else if (activeBuffer == bufferA) {
-      return notificationB;
-    } else {
-      return notificationA;
-    }
-  }
-
-  /**
-   * Creates callbacks to handle a multi-put and adds them to the request.
-   * @param request the request for which we must handle the response
-   */
-  private void addBatchCallbacks(final Batch request) {
-    final class BatchCallback implements Callback<BatchResponse, BatchResponse> {
-      public BatchResponse call(final BatchResponse response) {
-        LOG.trace("Got a Batch response for {} rows", request.operations.size());
-        if (response.getWriteTimestamp() != 0) {
-          AsyncKuduSession.this.client.updateLastPropagatedTimestamp(response.getWriteTimestamp());
-        }
-
-        // Send individualized responses to all the operations in this batch.
-        for (OperationResponse operationResponse : response.getIndividualResponses()) {
-          operationResponse.getOperation().callback(operationResponse);
-          if (flushMode == FlushMode.AUTO_FLUSH_BACKGROUND && operationResponse.hasRowError()) {
-            errorCollector.addError(operationResponse.getRowError());
-          }
-        }
-
-        return response;
-      }
-
-      @Override
-      public String toString() {
-        return "apply batch response";
-      }
-    }
-
-    final class BatchErrCallback implements Callback<Exception, Exception> {
-      @Override
-      public Exception call(Exception e) {
-        // Send the same exception to all the operations.
-        for (Operation operation : request.operations) {
-          operation.errback(e);
-        }
-        return e;
-      }
-      @Override
-      public String toString() {
-        return "apply batch error response";
-      }
-    }
-
-    request.getDeferred().addCallbacks(new BatchCallback(), new BatchErrCallback());
-  }
-
-  /**
-   * A FlusherTask is created for each active buffer in mode
-   * {@link FlushMode#AUTO_FLUSH_BACKGROUND}.
-   */
-  private final class FlusherTask implements TimerTask {
-    public void run(final Timeout timeout) {
-      Buffer buffer = null;
-      synchronized (monitor) {
-        if (activeBuffer == null) {
-          return;
-        }
-        if (activeBuffer.getFlusherTask() == this) {
-          buffer = activeBuffer;
-          activeBuffer = null;
-        }
-      }
-
-      if (buffer != null) {
-        doFlush(buffer);
-      }
-    }
-  }
-
-  /**
-   * The {@code Buffer} consists of a list of operations, an optional pointer to a flush task,
-   * and a flush notification.
-   *
-   * The {@link #flusherTask} is used in mode {@link FlushMode#AUTO_FLUSH_BACKGROUND} to point to
-   * the background flusher task assigned to the buffer when it becomes active and the first
-   * operation is applied to it. When the flusher task executes after the timeout, it checks
-   * that the currently active buffer's flusher task points to itself before executing the flush.
-   * This protects against the background task waking up after one or more manual flushes and
-   * attempting to flush the active buffer.
-   *
-   * The {@link #flushNotification} deferred is used when executing manual {@link #flush}es to
-   * ensure that non-active buffers are fully flushed. {@code flushNotification} is completed
-   * when this buffer is successfully flushed. When the buffer is promoted from inactive to active,
-   * the deferred is replaced with a new one to indicate that the buffer is not yet flushed.
-   *
-   * Buffer is externally synchronized. When the active buffer, {@link #monitor}
-   * synchronizes access to it.
-   */
-  private final class Buffer {
-    private final List<BufferedOperation> operations = new ArrayList<>();
-
-    private FlusherTask flusherTask = null;
-
-    private Deferred<Void> flushNotification = Deferred.fromResult(null);
-
-    public List<BufferedOperation> getOperations() {
-      return operations;
-    }
-
-    @GuardedBy("monitor")
-    public FlusherTask getFlusherTask() {
-      if (flusherTask == null) {
-        flusherTask = new FlusherTask();
-      }
-      return flusherTask;
-    }
-
-    /**
-     * Returns a {@link Deferred} which will be completed when this buffer is flushed. If the buffer
-     * is inactive (its flush is complete and it has been enqueued into {@link #inactiveBuffers}),
-     * then the deferred will already be complete.
-     */
-    public Deferred<Void> getFlushNotification() {
-      return flushNotification;
-    }
-
-    /**
-     * Completes the buffer's flush notification. Should be called when the buffer has been
-     * successfully flushed.
-     */
-    public void callbackFlushNotification() {
-      LOG.trace("buffer flush notification fired: {}", this);
-      flushNotification.callback(null);
-    }
-
-    /**
-     * Resets the buffer's internal state. Should be called when the buffer is promoted from
-     * inactive to active.
-     */
-    @GuardedBy("monitor")
-    public void reset() {
-      LOG.trace("buffer reset: {}", this);
-      operations.clear();
-      flushNotification = new Deferred<>();
-      flusherTask = null;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-                        .add("operations", operations.size())
-                        .add("flusherTask", flusherTask)
-                        .add("flushNotification", flushNotification)
-                        .toString();
-    }
-  }
-
-  /**
-   * Container class holding all the state associated with a buffered operation.
-   */
-  private static final class BufferedOperation {
-    /** Holds either a {@link LocatedTablet} or the failure exception if the lookup failed. */
-    private Object tablet = null;
-    private final Deferred<Void> tabletLookup;
-    private final Operation operation;
-
-    public BufferedOperation(Deferred<LocatedTablet> tablet,
-                             Operation operation) {
-      tabletLookup = AsyncUtil.addBoth(tablet, new Callback<Void, Object>() {
-        @Override
-        public Void call(final Object tablet) {
-          BufferedOperation.this.tablet = tablet;
-          return null;
-        }
-      });
-      this.operation = Preconditions.checkNotNull(operation);
-    }
-
-    /**
-     * @return {@code true} if the tablet lookup failed.
-     */
-    public boolean tabletLookupFailed() {
-      return !(tablet instanceof LocatedTablet);
-    }
-
-    /**
-     * @return the located tablet
-     * @throws ClassCastException if the tablet lookup failed,
-     *         check with {@link #tabletLookupFailed} before calling
-     */
-    public LocatedTablet getTablet() {
-      return (LocatedTablet) tablet;
-    }
-
-    /**
-     * @return the cause of the failed lookup
-     * @throws ClassCastException if the tablet lookup succeeded,
-     *         check with {@link #tabletLookupFailed} before calling
-     */
-    public Exception getTabletLookupFailure() {
-      return (Exception) tablet;
-    }
-
-    public Deferred<Void> getTabletLookup() {
-      return tabletLookup;
-    }
-
-    public Operation getOperation() {
-      return operation;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-                        .add("tablet", tablet)
-                        .add("operation", operation)
-                        .toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/kududb/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Batch.java b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
deleted file mode 100644
index ed1d870..0000000
--- a/java/kudu-client/src/main/java/org/kududb/client/Batch.java
+++ /dev/null
@@ -1,199 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.protobuf.Message;
-import com.google.protobuf.ZeroCopyLiteralByteString;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.kududb.WireProtocol;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.client.Statistics.Statistic;
-import org.kududb.client.Statistics.TabletStatistics;
-import org.kududb.tserver.Tserver;
-import org.kududb.tserver.Tserver.TabletServerErrorPB;
-import org.kududb.util.Pair;
-import org.kududb.util.Slice;
-
-/**
- * Used internally to group Operations for a single tablet together before sending to the tablet
- * server.
- */
-@InterfaceAudience.Private
-class Batch extends KuduRpc<BatchResponse> {
-
-  /** Holds batched operations. */
-  final List<Operation> operations = new ArrayList<>();
-
-  /** The tablet this batch will be routed to. */
-  private final LocatedTablet tablet;
-
-  /**
-   * This size will be set when serialize is called. It stands for the size of rows in all
-   * operations in this batch.
-   */
-  private long rowOperationsSizeBytes = 0;
-
-  /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
-  private final boolean ignoreAllDuplicateRows;
-
-
-  Batch(KuduTable table, LocatedTablet tablet, boolean ignoreAllDuplicateRows) {
-    super(table);
-    this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
-    this.tablet = tablet;
-  }
-
-  /**
-   * Returns the bytes size of this batch's row operations after serialization.
-   * @return size in bytes
-   * @throws IllegalStateException thrown if this RPC hasn't been serialized eg sent to a TS
-   */
-  long getRowOperationsSizeBytes() {
-    if (this.rowOperationsSizeBytes == 0) {
-      throw new IllegalStateException("This row hasn't been serialized yet");
-    }
-    return this.rowOperationsSizeBytes;
-  }
-
-  public void add(Operation operation) {
-    assert Bytes.memcmp(operation.partitionKey(),
-                        tablet.getPartition().getPartitionKeyStart()) >= 0 &&
-           (tablet.getPartition().getPartitionKeyEnd().length == 0 ||
-            Bytes.memcmp(operation.partitionKey(),
-                         tablet.getPartition().getPartitionKeyEnd()) < 0);
-
-    operations.add(operation);
-  }
-
-  @Override
-  ChannelBuffer serialize(Message header) {
-    final Tserver.WriteRequestPB.Builder builder = Operation.createAndFillWriteRequestPB(operations);
-    rowOperationsSizeBytes = builder.getRowOperations().getRows().size() +
-                             builder.getRowOperations().getIndirectData().size();
-    builder.setTabletId(ZeroCopyLiteralByteString.wrap(getTablet().getTabletIdAsBytes()));
-    builder.setExternalConsistencyMode(externalConsistencyMode.pbVersion());
-    return toChannelBuffer(header, builder.build());
-  }
-
-  @Override
-  String serviceName() {
-    return TABLET_SERVER_SERVICE_NAME;
-  }
-
-  @Override
-  String method() {
-    return Operation.METHOD;
-  }
-
-  @Override
-  Pair<BatchResponse, Object> deserialize(CallResponse callResponse,
-                                          String tsUUID) throws Exception {
-    Tserver.WriteResponsePB.Builder builder = Tserver.WriteResponsePB.newBuilder();
-    readProtobuf(callResponse.getPBMessage(), builder);
-
-    List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB = builder.getPerRowErrorsList();
-    if (ignoreAllDuplicateRows) {
-      boolean allAlreadyPresent = true;
-      for (Tserver.WriteResponsePB.PerRowErrorPB errorPB : errorsPB) {
-        if (errorPB.getError().getCode() != WireProtocol.AppStatusPB.ErrorCode.ALREADY_PRESENT) {
-          allAlreadyPresent = false;
-          break;
-        }
-      }
-      if (allAlreadyPresent) {
-        errorsPB = Collections.emptyList();
-      }
-    }
-
-    BatchResponse response = new BatchResponse(deadlineTracker.getElapsedMillis(), tsUUID,
-                                               builder.getTimestamp(), errorsPB, operations);
-
-    if (injectedError != null) {
-      if (injectedlatencyMs > 0) {
-        try {
-          Thread.sleep(injectedlatencyMs);
-        } catch (InterruptedException e) {
-        }
-      }
-      return new Pair<BatchResponse, Object>(response, injectedError);
-    }
-
-    return new Pair<BatchResponse, Object>(response, builder.hasError() ? builder.getError() : null);
-  }
-
-  @Override
-  public byte[] partitionKey() {
-    return tablet.getPartition().getPartitionKeyStart();
-  }
-
-  @Override
-  boolean isRequestTracked() {
-    return true;
-  }
-
-  @Override
-  void updateStatistics(Statistics statistics, BatchResponse response) {
-    Slice tabletId = this.getTablet().getTabletId();
-    String tableName = this.getTable().getName();
-    TabletStatistics tabletStatistics = statistics.getTabletStatistics(tableName, tabletId);
-    if (response == null) {
-      tabletStatistics.incrementStatistic(Statistic.OPS_ERRORS, operations.size());
-      tabletStatistics.incrementStatistic(Statistic.RPC_ERRORS, 1);
-      return;
-    }
-    tabletStatistics.incrementStatistic(Statistic.WRITE_RPCS, 1);
-    for (OperationResponse opResponse : response.getIndividualResponses()) {
-      if (opResponse.hasRowError()) {
-        tabletStatistics.incrementStatistic(Statistic.OPS_ERRORS, 1);
-      } else {
-        tabletStatistics.incrementStatistic(Statistic.WRITE_OPS, 1);
-      }
-    }
-    tabletStatistics.incrementStatistic(Statistic.BYTES_WRITTEN, getRowOperationsSizeBytes());
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-                      .add("operations", operations.size())
-                      .add("tablet", tablet)
-                      .add("ignoreAllDuplicateRows", ignoreAllDuplicateRows)
-                      .toString();
-  }
-
-  private static TabletServerErrorPB injectedError;
-  private static int injectedlatencyMs;
-
-  /**
-   * Inject tablet server side error for Batch rpc related tests.
-   * @param error error response from tablet server
-   * @param latencyMs blocks response handling thread for some time to simulate
-   * write latency
-   */
-  @VisibleForTesting
-  static void injectTabletServerErrorAndLatency(TabletServerErrorPB error, int latencyMs) {
-    injectedError = error;
-    injectedlatencyMs = latencyMs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java b/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java
deleted file mode 100644
index f67153b..0000000
--- a/java/kudu-client/src/main/java/org/kududb/client/BatchResponse.java
+++ /dev/null
@@ -1,105 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.tserver.Tserver;
-
-/**
- * Response type for Batch (which is used internally by AsyncKuduSession).
- * Provides the Hybrid Time write timestamp returned by the Tablet Server.
- */
-@InterfaceAudience.Private
-public class BatchResponse extends KuduRpcResponse {
-
-  private final long writeTimestamp;
-  private final List<RowError> rowErrors;
-  private final List<OperationResponse> individualResponses;
-
-  /**
-   * Package-private constructor to be used by the RPCs.
-   * @param elapsedMillis time in milliseconds since RPC creation to now
-   * @param writeTimestamp HT's write timestamp
-   * @param errorsPB a list of row errors, can be empty
-   * @param operations the list of operations which created this response
-   */
-  BatchResponse(long elapsedMillis, String tsUUID, long writeTimestamp,
-                List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB,
-                List<Operation> operations) {
-    super(elapsedMillis, tsUUID);
-    this.writeTimestamp = writeTimestamp;
-    individualResponses = new ArrayList<>(operations.size());
-    if (errorsPB.isEmpty()) {
-      rowErrors = Collections.emptyList();
-    } else {
-      rowErrors = new ArrayList<>(errorsPB.size());
-    }
-
-    // Populate the list of individual row responses and the list of row errors. Not all the rows
-    // maybe have errors, but 'errorsPB' contains them in the same order as the operations that
-    // were sent.
-    int currentErrorIndex = 0;
-    Operation currentOperation;
-    for (int i = 0; i < operations.size(); i++) {
-      RowError rowError = null;
-      currentOperation = operations.get(i);
-      if (currentErrorIndex < errorsPB.size() &&
-          errorsPB.get(currentErrorIndex).getRowIndex() == i) {
-        rowError = RowError.fromRowErrorPb(errorsPB.get(currentErrorIndex),
-            currentOperation, tsUUID);
-        rowErrors.add(rowError);
-        currentErrorIndex++;
-      }
-      individualResponses.add(
-          new OperationResponse(currentOperation.deadlineTracker.getElapsedMillis(), tsUUID,
-              writeTimestamp, currentOperation, rowError));
-    }
-    assert (rowErrors.size() == errorsPB.size());
-    assert (individualResponses.size() == operations.size());
-  }
-
-  BatchResponse(List<OperationResponse> individualResponses) {
-    super(0, null);
-    writeTimestamp = 0;
-    rowErrors = ImmutableList.of();
-    this.individualResponses = individualResponses;
-  }
-
-  /**
-   * Gives the write timestamp that was returned by the Tablet Server.
-   * @return a timestamp in milliseconds, 0 if the external consistency mode set in AsyncKuduSession
-   * wasn't CLIENT_PROPAGATED
-   */
-  public long getWriteTimestamp() {
-    return writeTimestamp;
-  }
-
-  /**
-   * Package-private method to get the individual responses.
-   * @return a list of OperationResponses
-   */
-  List<OperationResponse> getIndividualResponses() {
-    return individualResponses;
-  }
-
-}