You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/03/24 21:13:45 UTC
incubator-kudu git commit: [java client] Allow ignore duplicate rows
in auto flush sync mode
Repository: incubator-kudu
Updated Branches:
refs/heads/master ef2b64b9f -> 7f3691a82
[java client] Allow ignore duplicate rows in auto flush sync mode
Previously ignoreAllDuplicateRows only works in Batch rpc, not Operation
rpc, this commit add IgnoreAllDuplicateRows check in Operation RPC too.
Change-Id: I671dd1bf537c2f5e6aa757883b350768081eb035
Reviewed-on: http://gerrit.cloudera.org:8080/2607
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/7f3691a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/7f3691a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/7f3691a8
Branch: refs/heads/master
Commit: 7f3691a826b9d27199319409f8d721ec1d08a8ba
Parents: ef2b64b
Author: Binglin Chang <de...@gmail.com>
Authored: Wed Mar 23 20:46:18 2016 +0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu Mar 24 20:13:21 2016 +0000
----------------------------------------------------------------------
.../org/kududb/client/AsyncKuduSession.java | 1 +
.../main/java/org/kududb/client/Operation.java | 13 ++++++++
.../java/org/kududb/client/TestKuduSession.java | 35 ++++++++++++++++++++
3 files changed, 49 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7f3691a8/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
index e5fdb42..e9a2a18 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduSession.java
@@ -370,6 +370,7 @@ public class AsyncKuduSession implements SessionConfiguration {
operation.setTimeoutMillis(timeoutMs);
}
operation.setExternalConsistencyMode(this.consistencyMode);
+ operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
return client.sendRpcToTablet(operation);
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7f3691a8/java/kudu-client/src/main/java/org/kududb/client/Operation.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Operation.java b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
index 61b7889..165f095 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
@@ -23,6 +23,7 @@ import com.google.protobuf.ZeroCopyLiteralByteString;
import org.kududb.ColumnSchema;
import org.kududb.Schema;
import org.kududb.Type;
+import org.kududb.WireProtocol;
import org.kududb.WireProtocol.RowOperationsPB;
import org.kududb.annotations.InterfaceAudience;
import org.kududb.annotations.InterfaceStability;
@@ -69,6 +70,9 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
private final PartialRow row;
+ /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
+ boolean ignoreAllDuplicateRows = false;
+
/**
* Package-private constructor. Subclasses need to be instantiated via AsyncKuduSession
* @param table table with the schema to use for this operation
@@ -78,6 +82,11 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
this.row = table.getSchema().newPartialRow();
}
+ /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
+ void setIgnoreAllDuplicateRows(boolean ignoreAllDuplicateRows) {
+ this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
+ }
+
/**
* Classes extending Operation need to have a specific ChangeType
* @return Operation's ChangeType
@@ -130,6 +139,10 @@ public abstract class Operation extends KuduRpc<OperationResponse> implements Ku
Tserver.WriteResponsePB.PerRowErrorPB error = null;
if (builder.getPerRowErrorsCount() != 0) {
error = builder.getPerRowErrors(0);
+ if (ignoreAllDuplicateRows &&
+ error.getError().getCode() == WireProtocol.AppStatusPB.ErrorCode.ALREADY_PRESENT) {
+ error = null;
+ }
}
OperationResponse response = new OperationResponse(deadlineTracker.getElapsedMillis(), tsUUID,
builder.getTimestamp(), this, error);
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/7f3691a8/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java b/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
index 293c968..9ac856a 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestKuduSession.java
@@ -16,6 +16,9 @@
// under the License.
package org.kududb.client;
+import java.util.ArrayList;
+import java.util.List;
+
import org.junit.Test;
import static org.junit.Assert.*;
@@ -51,6 +54,38 @@ public class TestKuduSession extends BaseKuduTest {
}
@Test(timeout = 100000)
+ public void testIgnoreAllDuplicateRows() throws Exception {
+ String tableName = TABLE_NAME_PREFIX + "-testIgnoreAllDuplicateRows";
+ table = createTable(tableName, basicSchema, new CreateTableOptions());
+
+ KuduSession session = syncClient.newSession();
+ session.setIgnoreAllDuplicateRows(true);
+ for (int i = 0; i < 10; i++) {
+ session.apply(createInsert(i));
+ }
+ for (SessionConfiguration.FlushMode mode : SessionConfiguration.FlushMode.values()) {
+ session.setFlushMode(mode);
+ for (int i = 0; i < 10; i++) {
+ OperationResponse resp = session.apply(createInsert(i));
+ if (mode == SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC) {
+ assertFalse(resp.hasRowError());
+ }
+ }
+ if (mode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
+ List<OperationResponse> responses = session.flush();
+ for (OperationResponse resp : responses) {
+ assertFalse(resp.hasRowError());
+ }
+ } else if (mode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
+ while (session.hasPendingOperations()) {
+ Thread.sleep(100);
+ }
+ assertEquals(0, session.countPendingErrors());
+ }
+ }
+ }
+
+ @Test(timeout = 100000)
public void testBatchWithSameRow() throws Exception {
String tableName = TABLE_NAME_PREFIX + "-testBatchWithSameRow";
table = createTable(tableName, basicSchema, new CreateTableOptions());