You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/01/05 19:53:13 UTC

[1/3] incubator-kudu git commit: KUDU-1004. java client: Supply byte array, get back ByteBuffer

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 07bededa7 -> 63b9268f9


KUDU-1004. java client: Supply byte array, get back ByteBuffer

This patch adds the missing methods, which is simple, but to handle
BBs correctly we need to stop storing byte[]s inside PartialRow.

This patch does that refactoring and augments the unit tests.

Change-Id: Ia9b4a743a42e034ca1accb1966e1a61f1f54ae45
Reviewed-on: http://gerrit.cloudera.org:8080/1647
Reviewed-by: Mike Percy <mp...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/170c32c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/170c32c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/170c32c0

Branch: refs/heads/master
Commit: 170c32c08dc8afb9c3d0e067f7ba35244ed42e2e
Parents: 07beded
Author: Jean-Daniel Cryans <jd...@cloudera.com>
Authored: Wed Dec 16 10:15:03 2015 -0800
Committer: Jean-Daniel Cryans <jd...@gerrit.cloudera.org>
Committed: Wed Dec 23 16:42:21 2015 +0000

----------------------------------------------------------------------
 .../main/java/org/kududb/client/KeyEncoder.java | 58 +++++++--------
 .../main/java/org/kududb/client/Operation.java  | 34 +++++----
 .../main/java/org/kududb/client/PartialRow.java | 77 +++++++++++++++++---
 .../main/java/org/kududb/client/RowResult.java  |  6 +-
 .../java/org/kududb/client/BaseKuduTest.java    |  3 +-
 .../java/org/kududb/client/TestRowResult.java   | 27 ++++---
 6 files changed, 135 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/main/java/org/kududb/client/KeyEncoder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KeyEncoder.java b/java/kudu-client/src/main/java/org/kududb/client/KeyEncoder.java
index 5ec1359..b54828d 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KeyEncoder.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KeyEncoder.java
@@ -110,24 +110,42 @@ class KeyEncoder {
     final Type type = column.getType();
 
     if (type == Type.STRING || type == Type.BINARY) {
-      addComponent(row.getVarLengthData().get(columnIdx), type, isLast);
+      addBinaryComponent(row.getVarLengthData().get(columnIdx), isLast);
     } else {
       addComponent(row.getRowAlloc(),
                    schema.getColumnOffset(columnIdx),
                    type.getSize(),
-                   type,
-                   isLast);
+                   type);
     }
   }
 
   /**
-   * Encodes a value of the given type into the key.
+   * Encodes a byte buffer into the key.
    * @param value the value to encode
-   * @param type the type of the value to encode
    * @param isLast whether the value is the final component in the key
    */
-  private void addComponent(byte[] value, Type type, boolean isLast) {
-    addComponent(value, 0, value.length, type, isLast);
+  private void addBinaryComponent(ByteBuffer value, boolean isLast) {
+    value.reset();
+
+    // TODO find a way to not have to read byte-by-byte that doesn't require extra copies. This is
+    // especially slow now that users can pass direct byte buffers.
+    while (value.hasRemaining()) {
+      byte currentByte = value.get();
+      buf.write(currentByte);
+      if (!isLast && currentByte == 0x00) {
+        // If we're a middle component of a composite key, we need to add a \x00
+        // at the end in order to separate this component from the next one. However,
+        // if we just did that, we'd have issues where a key that actually has
+        // \x00 in it would compare wrong, so we have to instead add \x00\x00, and
+        // encode \x00 as \x00\x01. -- key_encoder.h
+        buf.write(0x01);
+      }
+    }
+
+    if (!isLast) {
+      buf.write(0x00);
+      buf.write(0x00);
+    }
   }
 
   /**
@@ -136,9 +154,8 @@ class KeyEncoder {
    * @param offset the offset into the {@code value} buffer that the value begins
    * @param len the length of the value
    * @param type the type of the value to encode
-   * @param isLast whether the value is the final component in the key
    */
-  private void addComponent(byte[] value, int offset, int len, Type type, boolean isLast) {
+  private void addComponent(byte[] value, int offset, int len, Type type) {
     switch (type) {
       case BOOL:
         assert len == 1;
@@ -149,7 +166,7 @@ class KeyEncoder {
       case INT32:
       case INT64:
       case TIMESTAMP:
-        // picking the first byte because big endian
+        // Picking the first byte because big endian.
         byte lastByte = value[offset + (len - 1)];
         lastByte = Bytes.xorLeftMostBit(lastByte);
         buf.write(lastByte);
@@ -159,27 +176,6 @@ class KeyEncoder {
           }
         }
         break;
-      case BINARY:
-      case STRING:
-        // if this is the last component, just add
-        if (isLast) {
-          buf.write(value, offset, len);
-        } else {
-          // If we're a middle component of a composite key, we need to add a \x00
-          // at the end in order to separate this component from the next one. However,
-          // if we just did that, we'd have issues where a key that actually has
-          // \x00 in it would compare wrong, so we have to instead add \x00\x00, and
-          // encode \x00 as \x00\x01. -- key_encoder.h
-          for (int b = offset; b < (offset + len); b++) {
-            buf.write(value[b]);
-            if (value[b] == 0x00) {
-              buf.write(0x01);
-            }
-          }
-          buf.write(0x00);
-          buf.write(0x00);
-        }
-        break;
       default:
         throw new IllegalArgumentException(String.format(
             "The column type %s is not a valid key component type", type));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/main/java/org/kududb/client/Operation.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Operation.java b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
index 7c7e244..741acb3 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
@@ -13,7 +13,6 @@
 // limitations under the License.
 package org.kududb.client;
 
-import com.google.common.primitives.Longs;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.ZeroCopyLiteralByteString;
@@ -29,9 +28,9 @@ import org.kududb.util.Pair;
 import org.jboss.netty.buffer.ChannelBuffer;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -170,7 +169,10 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
   static class OperationsEncoder {
     private Schema schema;
     private ByteBuffer rows;
-    private ByteArrayOutputStream indirect;
+    // We're filling this list as we go through the operations in encodeRow() and at the same time
+    // compute the total size, which will be used to right-size the array in toPB().
+    private List<ByteBuffer> indirect;
+    private long indirectWrittenBytes;
 
     /**
      * Initializes the state of the encoder based on the schema and number of operations to encode.
@@ -195,7 +197,7 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
       // instead of a doubling buffer like BAOS.
       this.rows = ByteBuffer.allocate(sizePerRow * numOperations)
                             .order(ByteOrder.LITTLE_ENDIAN);
-      this.indirect = new ByteArrayOutputStream();
+      this.indirect = new ArrayList<>(schema.getVarLengthColumnCount() * numOperations);
     }
 
     /**
@@ -214,7 +216,14 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
       if (indirect.size() > 0) {
         // TODO: same as above, we could avoid a copy here by using an implementation that allows
         // zero-copy on a slice of an array.
-        rowOpsBuilder.setIndirectData(ZeroCopyLiteralByteString.wrap(indirect.toByteArray()));
+        byte[] indirectData = new byte[(int)indirectWrittenBytes];
+        int offset = 0;
+        for (ByteBuffer bb : indirect) {
+          int bbSize = bb.remaining();
+          bb.get(indirectData, offset, bbSize);
+          offset += bbSize;
+        }
+        rowOpsBuilder.setIndirectData(ZeroCopyLiteralByteString.wrap(indirectData));
       }
       return rowOpsBuilder.build();
     }
@@ -232,14 +241,13 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
         // Keys should always be specified, maybe check?
         if (row.isSet(colIdx) && !row.isSetToNull(colIdx)) {
           if (col.getType() == Type.STRING || col.getType() == Type.BINARY) {
-            byte[] varLengthData = row.getVarLengthData().get(colIdx);
-            rows.putLong(indirect.size());
-            rows.putLong(varLengthData.length);
-            try {
-              indirect.write(varLengthData);
-            } catch (IOException e) {
-              throw new AssertionError(e); // cannot occur
-            }
+            ByteBuffer varLengthData = row.getVarLengthData().get(colIdx);
+            varLengthData.reset();
+            rows.putLong(indirectWrittenBytes);
+            int bbSize = varLengthData.remaining();
+            rows.putLong(bbSize);
+            indirect.add(varLengthData);
+            indirectWrittenBytes += bbSize;
           } else {
             // This is for cols other than strings
             rows.put(rowData, currentRowOffset, col.getType().getSize());

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/main/java/org/kududb/client/PartialRow.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/PartialRow.java b/java/kudu-client/src/main/java/org/kududb/client/PartialRow.java
index c3ae1ab..c13e894 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/PartialRow.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package org.kududb.client;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
@@ -25,24 +26,31 @@ import org.kududb.annotations.InterfaceAudience;
 import org.kududb.annotations.InterfaceStability;
 
 /**
- * Class used to represent parts of a row along with its schema.
+ * Class used to represent parts of a row along with its schema.<p>
  *
  * Values can be replaced as often as needed, but once the enclosing {@link Operation} is applied
- * then they cannot be changed again. This means that a PartialRow cannot be reused.
+ * then they cannot be changed again. This means that a PartialRow cannot be reused.<p>
  *
  * Each PartialRow is backed by an byte array where all the cells (except strings and binary data)
- * are written. The others are kept in a List.
+ * are written. The others are kept in a List.<p>
+ *
+ * This class isn't thread-safe.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class PartialRow {
 
   private final Schema schema;
-  // Variable length data. If string, will be UTF-8 encoded.
-  private final List<byte[]> varLengthData;
+
+  // Variable length data. If string, will be UTF-8 encoded. Elements of this list _must_ have a
+  // mark that we can reset() to. Readers of these fields (encoders, etc) must call reset() before
+  // attempting to read these values.
+  private final List<ByteBuffer> varLengthData;
   private final byte[] rowAlloc;
+
   private final BitSet columnsBitSet;
   private final BitSet nullsBitSet;
+
   private boolean frozen = false;
 
   /**
@@ -57,7 +65,7 @@ public class PartialRow {
         new BitSet(this.schema.getColumnCount()) : null;
     this.rowAlloc = new byte[schema.getRowSize()];
     // Pre-fill the array with nulls. We'll only replace cells that have varlen values.
-    this.varLengthData = Arrays.asList(new byte[this.schema.getColumnCount()][]);
+    this.varLengthData = Arrays.asList(new ByteBuffer[this.schema.getColumnCount()]);
   }
 
   /**
@@ -68,8 +76,19 @@ public class PartialRow {
     this.schema = row.schema;
 
     this.varLengthData = Lists.newArrayListWithCapacity(row.varLengthData.size());
-    for (byte[] data: row.varLengthData) {
-      this.varLengthData.add(data == null ? null : data.clone());
+    for (ByteBuffer data: row.varLengthData) {
+      if (data == null) {
+        this.varLengthData.add(null);
+      } else {
+        data.reset();
+        // Deep copy the ByteBuffer.
+        ByteBuffer clone = ByteBuffer.allocate(data.remaining());
+        clone.put(data);
+        clone.flip();
+
+        clone.mark(); // We always expect a mark.
+        this.varLengthData.add(clone);
+      }
     }
 
     this.rowAlloc = row.rowAlloc.clone();
@@ -326,6 +345,21 @@ public class PartialRow {
   }
 
   /**
+   * Add binary data with the specified value, from the current ByteBuffer's position to its limit.
+   * This method duplicates the ByteBuffer but doesn't copy the data. This means that the wrapped
+   * data must not be mutated after this.
+   * @param columnIndex the column's index in the schema
+   * @param value byte buffer to get the value from
+   * @throws IllegalArgumentException if the column doesn't exist or if the value doesn't match
+   * the column's type
+   * @throws IllegalStateException if the row was already applied
+   */
+  public void addBinary(int columnIndex, ByteBuffer value) {
+    checkColumn(schema.getColumnByIndex(columnIndex), Type.BINARY);
+    addVarLengthData(columnIndex, value);
+  }
+
+  /**
    * Add binary data with the specified value.
    * Note that the provided value must not be mutated after this.
    * @param columnName Name of the column
@@ -338,8 +372,31 @@ public class PartialRow {
     addBinary(schema.getColumnIndex(columnName), val);
   }
 
+  /**
+   * Add binary data with the specified value, from the current ByteBuffer's position to its limit.
+   * This method duplicates the ByteBuffer but doesn't copy the data. This means that the wrapped
+   * data must not be mutated after this.
+   * @param columnName Name of the column
+   * @param value byte buffer to get the value from
+   * @throws IllegalArgumentException if the column doesn't exist or if the value doesn't match
+   * the column's type
+   * @throws IllegalStateException if the row was already applied
+   */
+  public void addBinary(String columnName, ByteBuffer value) {
+    addBinary(schema.getColumnIndex(columnName), value);
+  }
+
   private void addVarLengthData(int columnIndex, byte[] val) {
-    varLengthData.set(columnIndex, val);
+    addVarLengthData(columnIndex, ByteBuffer.wrap(val));
+  }
+
+  private void addVarLengthData(int columnIndex, ByteBuffer val) {
+    // A duplicate will copy all the original's metadata but still point to the same content.
+    ByteBuffer duplicate = val.duplicate();
+    // Mark the current position so we can reset to it.
+    duplicate.mark();
+
+    varLengthData.set(columnIndex, duplicate);
     // Set the usage bit but we don't care where it is.
     getPositionInRowAllocAndSetBitSet(columnIndex);
     // We don't set anything in row alloc, it will be managed at encoding time.
@@ -464,7 +521,7 @@ public class PartialRow {
    * Get the list variable length data cells that were added to this row.
    * @return a list of binary data, may be empty
    */
-  List<byte[]> getVarLengthData() {
+  List<ByteBuffer> getVarLengthData() {
     return varLengthData;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/main/java/org/kududb/client/RowResult.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/RowResult.java b/java/kudu-client/src/main/java/org/kududb/client/RowResult.java
index 2f005ad..503367f 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/RowResult.java
@@ -363,8 +363,7 @@ public class RowResult {
   /**
    * Get the specified column's binary data.
    *
-   * This doesn't copy the data and instead returns a ByteBuffer that wraps it. The ByteBuffer
-   * is backed by 'indirectData' in this RowResult.
+   * This doesn't copy the data and instead returns a ByteBuffer that wraps it.
    *
    * @param columnName name of the column to get data for
    * @return a byte[] with the binary data.
@@ -378,8 +377,7 @@ public class RowResult {
   /**
    * Get the specified column's binary data.
    *
-   * This doesn't copy the data and instead returns a ByteBuffer that wraps it. The ByteBuffer
-   * is backed by 'indirectData' in this RowResult.
+   * This doesn't copy the data and instead returns a ByteBuffer that wraps it.
    *
    * @param columnIndex Column index in the schema
    * @return a byte[] with the binary data.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
index e3d8435..f1d1ca1 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
@@ -240,7 +240,8 @@ public class BaseKuduTest {
             new ColumnSchema.ColumnSchemaBuilder("float", Type.FLOAT).build(),
             new ColumnSchema.ColumnSchemaBuilder("double", Type.DOUBLE).build(),
             new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).build(),
-            new ColumnSchema.ColumnSchemaBuilder("binary", Type.BINARY).build(),
+            new ColumnSchema.ColumnSchemaBuilder("binary-array", Type.BINARY).build(),
+            new ColumnSchema.ColumnSchemaBuilder("binary-bytebuffer", Type.BINARY).build(),
             new ColumnSchema.ColumnSchemaBuilder("null", Type.STRING).nullable(true).build(),
             new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.TIMESTAMP).build());
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/test/java/org/kududb/client/TestRowResult.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestRowResult.java b/java/kudu-client/src/test/java/org/kududb/client/TestRowResult.java
index 840958a..ff19917 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestRowResult.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestRowResult.java
@@ -52,9 +52,12 @@ public class TestRowResult extends BaseKuduTest {
     row.addFloat(5, 5.6f);
     row.addDouble(6, 7.8);
     row.addString(7, "string-value");
-    row.addBinary(8, "binary-value".getBytes());
-    row.setNull(9);
-    row.addLong(10, 10l);
+    row.addBinary(8, "binary-array".getBytes());
+    ByteBuffer bb = ByteBuffer.wrap("binary-bytebuffer".getBytes());
+    bb.position(7); // We're only inserting the bytebuffer part of the original array.
+    row.addBinary(9, bb);
+    row.setNull(10);
+    row.addLong(11, 11l);
 
     KuduSession session = syncClient.newSession();
     session.apply(insert);
@@ -89,21 +92,23 @@ public class TestRowResult extends BaseKuduTest {
       assertEquals("string-value", rr.getString(7));
       assertEquals("string-value", rr.getString(allTypesSchema.getColumnByIndex(7).getName()));
 
-      assertArrayEquals("binary-value".getBytes(), rr.getBinaryCopy(8));
-      assertArrayEquals("binary-value".getBytes(),
+      assertArrayEquals("binary-array".getBytes(), rr.getBinaryCopy(8));
+      assertArrayEquals("binary-array".getBytes(),
           rr.getBinaryCopy(allTypesSchema.getColumnByIndex(8).getName()));
 
       ByteBuffer buffer = rr.getBinary(8);
       assertEquals(buffer, rr.getBinary(allTypesSchema.getColumnByIndex(8).getName()));
       byte[] binaryValue = new byte[buffer.remaining()];
       buffer.get(binaryValue);
-      assertArrayEquals("binary-value".getBytes(), binaryValue);
+      assertArrayEquals("binary-array".getBytes(), binaryValue);
 
-      assertEquals(true, rr.isNull(9));
-      assertEquals(true, rr.isNull(allTypesSchema.getColumnByIndex(9).getName()));
+      assertArrayEquals("bytebuffer".getBytes(), rr.getBinaryCopy(9));
 
-      assertEquals(10, rr.getLong(10));
-      assertEquals(10, rr.getLong(allTypesSchema.getColumnByIndex(10).getName()));
+      assertEquals(true, rr.isNull(10));
+      assertEquals(true, rr.isNull(allTypesSchema.getColumnByIndex(10).getName()));
+
+      assertEquals(11, rr.getLong(11));
+      assertEquals(11, rr.getLong(allTypesSchema.getColumnByIndex(11).getName()));
 
       // We test with the column name once since it's the same method for all types, unlike above.
       assertEquals(Type.INT8, rr.getColumnType(allTypesSchema.getColumnByIndex(0).getName()));
@@ -116,7 +121,7 @@ public class TestRowResult extends BaseKuduTest {
       assertEquals(Type.DOUBLE, rr.getColumnType(6));
       assertEquals(Type.STRING, rr.getColumnType(7));
       assertEquals(Type.BINARY, rr.getColumnType(8));
-      assertEquals(Type.TIMESTAMP, rr.getColumnType(10));
+      assertEquals(Type.TIMESTAMP, rr.getColumnType(11));
     }
   }
 }


[2/3] incubator-kudu git commit: [consensus] Make some logs less spammy

Posted by to...@apache.org.
[consensus] Make some logs less spammy

We're running some chaos monkey-like workloads and we're getting into
situations where we're able to generate hundreds of MBs of logs per
minute. This patch fixes two of such logs.

Change-Id: Ib41166786bfb1a35ebb7acb721a69cbc7ecc28a4
Reviewed-on: http://gerrit.cloudera.org:8080/1674
Tested-by: Jean-Daniel Cryans
Reviewed-by: Mike Percy <mp...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/c367baa6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/c367baa6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/c367baa6

Branch: refs/heads/master
Commit: c367baa61a7e1b6c69fbde1fd13be5885277c19b
Parents: 170c32c
Author: Jean-Daniel Cryans <jd...@cloudera.com>
Authored: Mon Dec 21 10:43:01 2015 -0800
Committer: Jean-Daniel Cryans <jd...@gerrit.cloudera.org>
Committed: Wed Dec 23 16:44:57 2015 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus.cc       | 9 +++++++--
 src/kudu/consensus/raft_consensus_state.cc | 3 ++-
 2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c367baa6/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index e81b8e7..a675065 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -1107,11 +1107,16 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // that were actually prepared, and deleting the other ones since we've taken ownership
     // when we first deduped.
     if (iter != deduped_req.messages.end()) {
+      bool need_to_warn = true;
       while (iter != deduped_req.messages.end()) {
         ReplicateRefPtr msg = (*iter);
         iter = deduped_req.messages.erase(iter);
-        LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Could not prepare transaction for op: "
-            << msg->get()->id() << ". Status: " << prepare_status.ToString();
+        if (need_to_warn) {
+          need_to_warn = false;
+          LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Could not prepare transaction for op: "
+              << msg->get()->id() << ". Suppressed " << deduped_req.messages.size() <<
+              " other warnings. Status for this op: " << prepare_status.ToString();
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c367baa6/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index 8495b5b..8127aec 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -523,7 +523,8 @@ Status ReplicaState::UpdateMajorityReplicatedUnlocked(const OpId& majority_repli
   }
 
   committed_index->CopyFrom(last_committed_index_);
-  LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Can't advance the committed index across term boundaries"
+  KLOG_EVERY_N_SECS(WARNING, 1) << LogPrefixUnlocked()
+          << "Can't advance the committed index across term boundaries"
           << " until operations from the current term are replicated."
           << " Last committed operation was: " << last_committed_index_.ShortDebugString() << ","
           << " New majority replicated is: " << majority_replicated.ShortDebugString() << ","


[3/3] incubator-kudu git commit: [java client] - Current batch may get stuck if inflight batch got error

Posted by to...@apache.org.
[java client] - Current batch may get stuck if inflight batch got error

This commit fix a bug in java client which may cause write operations
stuck forever. Before this commit, if a batch flush is scheduled but
there is already an inflight batch in-progress, we add a callback to
retry flushing the batch after inflight batch is successfully completed,
but if inflight batch have an error, default PASSTHROUGH errback will be
called and this batch will never be flushed. This commit add retry
flushing to both callback and errback.

Also more informations is added to tracing log to help debug.

Change-Id: Icadfe78a1d70ac6490d36b0801c6d4fa30939955
Reviewed-on: http://gerrit.cloudera.org:8080/1565
Reviewed-by: Mike Percy <mp...@cloudera.com>
Tested-by: Mike Percy <mp...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/63b9268f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/63b9268f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/63b9268f

Branch: refs/heads/master
Commit: 63b9268f90221570cf753ca706f1d22d0932f408
Parents: c367baa
Author: Binglin Chang <de...@gmail.com>
Authored: Mon Nov 30 13:42:46 2015 +0800
Committer: Mike Percy <mp...@cloudera.com>
Committed: Tue Jan 5 09:34:36 2016 +0000

----------------------------------------------------------------------
 .../org/kududb/client/AsyncKuduSession.java     | 76 +++++++++++++++++---
 .../src/main/java/org/kududb/client/Batch.java  | 33 +++++++++
 .../org/kududb/client/TestAsyncKuduSession.java | 53 ++++++++++++++
 3 files changed, 153 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/63b9268f/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
index ca4e6b2..7bd2533 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
@@ -627,14 +627,27 @@ public class AsyncKuduSession implements SessionConfiguration {
       // was already flushed.
       if (operations.get(tablet) != expectedBatch) {
         LOG.trace("Had to flush a tablet but it was already flushed: " + Bytes.getString(tablet));
+        // It is OK to return null here, since we currently do not use the returned value
+        // when doing background flush or auto flushing when buffer is full.
+        // The returned value is used when doing manual flush, but it will not run into this
+        // condition, or there is a bug.
         return Deferred.fromResult(null);
       }
 
       if (operationsInFlight.containsKey(tablet)) {
-        LOG.trace("This tablet is already in flight, attaching a callback to retry later: " +
-            Bytes.getString(tablet));
-        return operationsInFlight.get(tablet).addCallbackDeferring(
-            new FlushRetryCallback(tablet, operations.get(tablet)));
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Tablet " + Bytes.getString(tablet)
+              + " is already in flight, attaching a callback to retry "
+              + expectedBatch.toDebugString() + " later.");
+        }
+        // No matter previous batch get error or not, we still have to flush this batch.
+        FlushRetryCallback retryCallback = new FlushRetryCallback(tablet, operations.get(tablet));
+        FlushRetryErrback retryErrback = new FlushRetryErrback(tablet, operations.get(tablet));
+        // Note that if we do manual flushing multiple times when previous batch is still inflight,
+        // we may add the same callback multiple times, later retry of flushTablet will return null
+        // immediately. Since it is an illegal use case, we do not handle this currently.
+        operationsInFlight.get(tablet).addCallbacks(retryCallback, retryErrback);
+        return expectedBatch.getDeferred();
       }
 
       batch = operations.remove(tablet);
@@ -660,7 +673,7 @@ public class AsyncKuduSession implements SessionConfiguration {
    * Simple callback so that we try to flush this tablet again if we were waiting on the previous
    * Batch to finish.
    */
-  class FlushRetryCallback implements Callback<Deferred<BatchResponse>, BatchResponse> {
+  class FlushRetryCallback implements Callback<BatchResponse, BatchResponse> {
     private final Slice tablet;
     private final Batch expectedBatch;
     public FlushRetryCallback(Slice tablet, Batch expectedBatch) {
@@ -669,10 +682,45 @@ public class AsyncKuduSession implements SessionConfiguration {
     }
 
     @Override
-    public Deferred<BatchResponse> call(BatchResponse o) throws Exception {
-      LOG.trace("Previous batch in flight is done, flushing this tablet again: " +
-          Bytes.getString(tablet));
-      return flushTablet(tablet, expectedBatch);
+    public BatchResponse call(BatchResponse o) throws Exception {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Previous batch in flight is done. " + toString());
+      }
+      flushTablet(tablet, expectedBatch);
+      return o;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("FlushRetryCallback: retry flush tablet %s %s", Bytes.getString(tablet),
+          expectedBatch.toDebugString());
+    }
+  }
+
+  /**
+   * Same callback as above FlushRetryCallback, for the case that previous batch has error.
+   */
+  class FlushRetryErrback implements Callback<Exception, Exception> {
+    private final Slice tablet;
+    private final Batch expectedBatch;
+    public FlushRetryErrback(Slice tablet, Batch expectedBatch) {
+      this.tablet = tablet;
+      this.expectedBatch = expectedBatch;
+    }
+
+    @Override
+    public Exception call(Exception e) throws Exception {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Previous batch ended with an error. " + toString());
+      }
+      flushTablet(tablet, expectedBatch);
+      return e;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("FlushRetryErrback: retry flush tablet %s %s", Bytes.getString(tablet),
+          expectedBatch.toDebugString());
     }
   }
 
@@ -687,6 +735,11 @@ public class AsyncKuduSession implements SessionConfiguration {
         tabletInFlightDone(tablet);
         return o;
       }
+
+      @Override
+      public String toString() {
+        return "callback: mark tablet " + Bytes.getString(tablet) + " inflight done";
+      }
     };
   }
 
@@ -702,6 +755,11 @@ public class AsyncKuduSession implements SessionConfiguration {
         tabletInFlightDone(tablet);
         return e;
       }
+
+      @Override
+      public String toString() {
+        return "errback: mark tablet " + Bytes.getString(tablet) + " inflight done";
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/63b9268f/java/kudu-client/src/main/java/org/kududb/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Batch.java b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
index aa806ec..e084933 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
@@ -13,11 +13,14 @@
 // limitations under the License.
 package org.kududb.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.Message;
 import com.google.protobuf.ZeroCopyLiteralByteString;
+
 import org.kududb.WireProtocol;
 import org.kududb.annotations.InterfaceAudience;
 import org.kududb.tserver.Tserver;
+import org.kududb.tserver.Tserver.TabletServerErrorPB;
 import org.kududb.util.Pair;
 import org.jboss.netty.buffer.ChannelBuffer;
 
@@ -100,6 +103,17 @@ class Batch extends KuduRpc<BatchResponse> implements KuduRpc.HasKey {
 
     BatchResponse response = new BatchResponse(deadlineTracker.getElapsedMillis(), tsUUID,
         builder.getTimestamp(), errorsPB, ops);
+
+    if (injectedError != null) {
+      if (injectedlatencyMs > 0) {
+        try {
+          Thread.sleep(injectedlatencyMs);
+        } catch (InterruptedException e) {
+        }
+      }
+      return new Pair<BatchResponse, Object>(response, injectedError);
+    }
+
     return new Pair<BatchResponse, Object>(response, builder.hasError() ? builder.getError() : null);
   }
 
@@ -109,6 +123,10 @@ class Batch extends KuduRpc<BatchResponse> implements KuduRpc.HasKey {
     return this.ops.get(0).partitionKey();
   }
 
+  public String toDebugString() {
+    return "Batch(" + ops.size() + " ops)@" + Integer.toHexString(hashCode());
+  }
+
   /**
    * Sorts the Operations by their sequence number.
    */
@@ -118,4 +136,19 @@ class Batch extends KuduRpc<BatchResponse> implements KuduRpc.HasKey {
       return Long.compare(o1.getSequenceNumber(), o2.getSequenceNumber());
     }
   }
+
+  private static TabletServerErrorPB injectedError;
+  private static int injectedlatencyMs;
+
+  /**
+   * Inject tablet server side error for Batch rpc related tests.
+   * @param error error response from tablet server
+   * @param latencyMs blocks response handling thread for some time to simulate
+   * write latency
+   */
+  @VisibleForTesting
+  static void injectTabletServerErrorAndLatency(TabletServerErrorPB error, int latencyMs) {
+    injectedError = error;
+    injectedlatencyMs = latencyMs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/63b9268f/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduSession.java
index 66341fa..6a97f9d 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduSession.java
@@ -14,8 +14,11 @@
 package org.kududb.client;
 
 import org.kududb.Schema;
+import org.kududb.WireProtocol.AppStatusPB;
+import org.kududb.tserver.Tserver.TabletServerErrorPB;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+import com.stumbleupon.async.TimeoutException;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -51,6 +54,56 @@ public class TestAsyncKuduSession extends BaseKuduTest {
     table = createTable(TABLE_NAME, schema, new CreateTableOptions());
   }
 
+  /**
+   * Regression test for case where an error in the previous batch could cause the next
+   * batch to hang in flush()
+   */
+  @Test(timeout = 100000)
+  public void testBatchErrorCauseSessionStuck() throws Exception {
+    try {
+      AsyncKuduSession session = client.newSession();
+      session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
+      session.setFlushInterval(100);
+      TabletServerErrorPB error = TabletServerErrorPB.newBuilder()
+          .setCode(TabletServerErrorPB.Code.UNKNOWN_ERROR)
+          .setStatus(AppStatusPB.newBuilder()
+              .setCode(AppStatusPB.ErrorCode.UNKNOWN_ERROR)
+              .setMessage("injected error for test")
+              .build())
+          .build();
+      Batch.injectTabletServerErrorAndLatency(error, 200);
+      // 0ms: insert first row, which will be the first batch.
+      Deferred<OperationResponse> resp1 = session.apply(createInsert(1));
+      Thread.sleep(120);
+      // 100ms: start to send first batch.
+      // 100ms+: first batch got response from ts,
+      //         will wait 200s and throw erorr.
+      // 120ms: insert another row, which will be the second batch.
+      Deferred<OperationResponse> resp2 = session.apply(createInsert(2));
+      // 220ms: start to send the second batch, but first batch is inflight,
+      //        so add callback to retry after first batch finishes.
+      // 300ms: first batch's callback handles error, retry second batch.
+      try {
+        resp1.join(2000);
+      } catch (TimeoutException e) {
+        fail("First batch should not timeout in case of tablet server error");
+      } catch (TabletServerErrorException e) {
+        // Expected.
+        assertTrue(e.getMessage().contains("injected error for test"));
+      }
+      try {
+        resp2.join(2000);
+      } catch (TimeoutException e) {
+        fail("Second batch should not timeout in case of tablet server error");
+      } catch (TabletServerErrorException e) {
+        // expected
+        assertTrue(e.getMessage().contains("injected error for test"));
+      }
+    } finally {
+      Batch.injectTabletServerErrorAndLatency(null, 0);
+    }
+  }
+
   @Test(timeout = 100000)
   public void test() throws Exception {