You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/07/12 18:27:07 UTC
[2/3] kudu git commit: java: add a new test for multiple sessions on
a single client
java: add a new test for multiple sessions on a single client
I was hoping this would reproduce KUDU-2053, a client bug that seems to
be hitting Spark upsert use cases, but it didn't reproduce it.
Nonetheless, we don't seem to have any test coverage of multithreaded
use of a KuduClient with different KuduSession objects, so this test is
valuable.
Change-Id: Ife7f02b160d4635e8acb0155c98a1ef9c3dbab5e
Reviewed-on: http://gerrit.cloudera.org:8080/7361
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Jean-Daniel Cryans <jd...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/96cdcbd0
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/96cdcbd0
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/96cdcbd0
Branch: refs/heads/master
Commit: 96cdcbd00b148e123d25bc63969b43ad14d67551
Parents: ac2a285
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jul 5 18:30:20 2017 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Wed Jul 12 18:14:49 2017 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/ITClientStress.java | 59 +++++++++++++++++++-
1 file changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/96cdcbd0/java/kudu-client/src/test/java/org/apache/kudu/client/ITClientStress.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClientStress.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClientStress.java
index 89f2fa3..a3632c9 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClientStress.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClientStress.java
@@ -17,7 +17,6 @@
package org.apache.kudu.client;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.Closeable;
@@ -26,13 +25,14 @@ import java.util.concurrent.Callable;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import org.junit.Test;
-
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
import org.apache.kudu.util.CapturingLogAppender;
public class ITClientStress extends BaseKuduTest {
@@ -72,7 +72,9 @@ public class ITClientStress extends BaseKuduTest {
} finally {
pool.shutdown();
assertTrue(pool.awaitTermination(10, TimeUnit.SECONDS));
- assertNull(thrown.get());
+ if (thrown.get() != null) {
+ throw new AssertionError(thrown.get());
+ }
}
assertFalse("log contained NPE",
cla.getAppendedText().contains("NullPointerException"));
@@ -111,4 +113,55 @@ public class ITClientStress extends BaseKuduTest {
}
});
}
+
+ /**
+ * Stress test which performs upserts from many sessions on different threads
+ * sharing the same KuduClient and KuduTable instance.
+ */
+ @Test(timeout=60000)
+ public void testMultipleSessions() throws Exception {
+ final String TABLE_NAME = "testMultipleSessions";
+ final int SECONDS_TO_RUN = 10;
+ final int NUM_THREADS = 60;
+ final KuduTable table = createTable(TABLE_NAME, basicSchema,
+ getBasicCreateTableOptions());
+ final AtomicInteger numUpserted = new AtomicInteger(0);
+ try (final KuduClient client =
+ new KuduClient.KuduClientBuilder(masterAddresses)
+ .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+ .build()) {
+
+ runTasks(NUM_THREADS, SECONDS_TO_RUN, new Supplier<Callable<Void>>() {
+ @Override
+ public Callable<Void> get() {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ KuduSession s = client.newSession();
+ s.setFlushMode(FlushMode.AUTO_FLUSH_SYNC);
+ try {
+ for (int i = 0; i < 100; i++) {
+ Upsert u = table.newUpsert();
+ u.getRow().addInt(0, i);
+ u.getRow().addInt(1, 12345);
+ u.getRow().addInt(2, 3);
+ u.getRow().setNull(3);
+ u.getRow().addBoolean(4, false);
+ OperationResponse apply = s.apply(u);
+ if (apply.hasRowError()) {
+ throw new AssertionError(apply.getRowError().toString());
+ }
+ numUpserted.incrementAndGet();
+ }
+ } finally {
+ s.close();
+ }
+ return null;
+ }
+ };
+ }
+ });
+ }
+ LOG.info("Upserted {} rows", numUpserted.get());
+ }
}