You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/03/24 21:13:45 UTC

incubator-kudu git commit: [java client] Allow ignore duplicate rows in auto flush sync mode

Repository: incubator-kudu
Updated Branches:
  refs/heads/master ef2b64b9f -> 7f3691a82


[java client] Allow ignore duplicate rows in auto flush sync mode

Previously ignoreAllDuplicateRows only works in Batch rpc, not Operation
rpc, this commit add IgnoreAllDuplicateRows check in Operation RPC too.

Change-Id: I671dd1bf537c2f5e6aa757883b350768081eb035
Reviewed-on: http://gerrit.cloudera.org:8080/2607
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/7f3691a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/7f3691a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/7f3691a8

Branch: refs/heads/master
Commit: 7f3691a826b9d27199319409f8d721ec1d08a8ba
Parents: ef2b64b
Author: Binglin Chang <de...@gmail.com>
Authored: Wed Mar 23 20:46:18 2016 +0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu Mar 24 20:13:21 2016 +0000

----------------------------------------------------------------------
 .../org/kududb/client/AsyncKuduSession.java     |  1 +
 .../main/java/org/kududb/client/Operation.java  | 13 ++++++++
 .../java/org/kududb/client/TestKuduSession.java | 35 ++++++++++++++++++++
 3 files changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7f3691a8/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
index e5fdb42..e9a2a18 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
@@ -370,6 +370,7 @@ public class AsyncKuduSession implements SessionConfiguration {
         operation.setTimeoutMillis(timeoutMs);
       }
       operation.setExternalConsistencyMode(this.consistencyMode);
+      operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
       return client.sendRpcToTablet(operation);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7f3691a8/java/kudu-client/src/main/java/org/kududb/client/Operation.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Operation.java b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
index 61b7889..165f095 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
@@ -23,6 +23,7 @@ import com.google.protobuf.ZeroCopyLiteralByteString;
 import org.kududb.ColumnSchema;
 import org.kududb.Schema;
 import org.kududb.Type;
+import org.kududb.WireProtocol;
 import org.kududb.WireProtocol.RowOperationsPB;
 import org.kududb.annotations.InterfaceAudience;
 import org.kududb.annotations.InterfaceStability;
@@ -69,6 +70,9 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
 
   private final PartialRow row;
 
+  /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
+  boolean ignoreAllDuplicateRows = false;
+
   /**
    * Package-private constructor. Subclasses need to be instantiated via AsyncKuduSession
    * @param table table with the schema to use for this operation
@@ -78,6 +82,11 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
     this.row = table.getSchema().newPartialRow();
   }
 
+  /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
+  void setIgnoreAllDuplicateRows(boolean ignoreAllDuplicateRows) {
+    this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
+  }
+
   /**
    * Classes extending Operation need to have a specific ChangeType
    * @return Operation's ChangeType
@@ -130,6 +139,10 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
     Tserver.WriteResponsePB.PerRowErrorPB error = null;
     if (builder.getPerRowErrorsCount() != 0) {
       error = builder.getPerRowErrors(0);
+      if (ignoreAllDuplicateRows &&
+          error.getError().getCode() == WireProtocol.AppStatusPB.ErrorCode.ALREADY_PRESENT) {
+        error = null;
+      }
     }
     OperationResponse response = new OperationResponse(deadlineTracker.getElapsedMillis(), tsUUID,
         builder.getTimestamp(), this, error);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7f3691a8/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java b/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
index 293c968..9ac856a 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
@@ -16,6 +16,9 @@
 // under the License.
 package org.kududb.client;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -51,6 +54,38 @@ public class TestKuduSession extends BaseKuduTest {
   }
 
   @Test(timeout = 100000)
+  public void testIgnoreAllDuplicateRows() throws Exception {
+    String tableName = TABLE_NAME_PREFIX + "-testIgnoreAllDuplicateRows";
+    table = createTable(tableName, basicSchema, new CreateTableOptions());
+
+    KuduSession session = syncClient.newSession();
+    session.setIgnoreAllDuplicateRows(true);
+    for (int i = 0; i < 10; i++) {
+      session.apply(createInsert(i));
+    }
+    for (SessionConfiguration.FlushMode mode : SessionConfiguration.FlushMode.values()) {
+      session.setFlushMode(mode);
+      for (int i = 0; i < 10; i++) {
+        OperationResponse resp = session.apply(createInsert(i));
+        if (mode == SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC) {
+          assertFalse(resp.hasRowError());
+        }
+      }
+      if (mode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
+        List<OperationResponse> responses = session.flush();
+        for (OperationResponse resp : responses) {
+          assertFalse(resp.hasRowError());
+        }
+      } else if (mode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
+        while (session.hasPendingOperations()) {
+          Thread.sleep(100);
+        }
+        assertEquals(0, session.countPendingErrors());
+      }
+    }
+  }
+
+  @Test(timeout = 100000)
   public void testBatchWithSameRow() throws Exception {
     String tableName = TABLE_NAME_PREFIX + "-testBatchWithSameRow";
     table = createTable(tableName, basicSchema, new CreateTableOptions());