You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/07/12 18:27:07 UTC

[2/3] kudu git commit: java: add a new test for multiple sessions on a single client

java: add a new test for multiple sessions on a single client

I was hoping this would reproduce KUDU-2053, a client bug that seems to
be hitting Spark upsert use cases, but it didn't reproduce it.
Nonetheless, we don't seem to have any test coverage of multithreaded
use of a KuduClient with different KuduSession objects, so this test is
valuable.

Change-Id: Ife7f02b160d4635e8acb0155c98a1ef9c3dbab5e
Reviewed-on: http://gerrit.cloudera.org:8080/7361
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/96cdcbd0
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/96cdcbd0
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/96cdcbd0

Branch: refs/heads/master
Commit: 96cdcbd00b148e123d25bc63969b43ad14d67551
Parents: ac2a285
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jul 5 18:30:20 2017 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Wed Jul 12 18:14:49 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/ITClientStress.java  | 59 +++++++++++++++++++-
 1 file changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/96cdcbd0/java/kudu-client/src/test/java/org/apache/kudu/client/ITClientStress.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClientStress.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClientStress.java
index 89f2fa3..a3632c9 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClientStress.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClientStress.java
@@ -17,7 +17,6 @@
 package org.apache.kudu.client;
 
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.Closeable;
@@ -26,13 +25,14 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 
 import org.junit.Test;
-
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
 import org.apache.kudu.util.CapturingLogAppender;
 
 public class ITClientStress extends BaseKuduTest {
@@ -72,7 +72,9 @@ public class ITClientStress extends BaseKuduTest {
     } finally {
       pool.shutdown();
       assertTrue(pool.awaitTermination(10, TimeUnit.SECONDS));
-      assertNull(thrown.get());
+      if (thrown.get() != null) {
+        throw new AssertionError(thrown.get());
+      }
     }
     assertFalse("log contained NPE",
         cla.getAppendedText().contains("NullPointerException"));
@@ -111,4 +113,55 @@ public class ITClientStress extends BaseKuduTest {
       }
     });
   }
+
+  /**
+   * Stress test which performs upserts from many sessions on different threads
+   * sharing the same KuduClient and KuduTable instance.
+   */
+  @Test(timeout=60000)
+  public void testMultipleSessions() throws Exception {
+    final String TABLE_NAME = "testMultipleSessions";
+    final int SECONDS_TO_RUN = 10;
+    final int NUM_THREADS = 60;
+    final KuduTable table = createTable(TABLE_NAME, basicSchema,
+        getBasicCreateTableOptions());
+    final AtomicInteger numUpserted = new AtomicInteger(0);
+    try (final KuduClient client =
+        new KuduClient.KuduClientBuilder(masterAddresses)
+        .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+        .build()) {
+
+      runTasks(NUM_THREADS, SECONDS_TO_RUN, new Supplier<Callable<Void>>() {
+        @Override
+        public Callable<Void> get() {
+          return new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+              KuduSession s = client.newSession();
+              s.setFlushMode(FlushMode.AUTO_FLUSH_SYNC);
+              try {
+                for (int i = 0; i < 100; i++) {
+                  Upsert u = table.newUpsert();
+                  u.getRow().addInt(0, i);
+                  u.getRow().addInt(1, 12345);
+                  u.getRow().addInt(2, 3);
+                  u.getRow().setNull(3);
+                  u.getRow().addBoolean(4, false);
+                  OperationResponse apply = s.apply(u);
+                  if (apply.hasRowError()) {
+                    throw new AssertionError(apply.getRowError().toString());
+                  }
+                  numUpserted.incrementAndGet();
+                }
+              } finally {
+                s.close();
+              }
+              return null;
+            }
+          };
+        }
+      });
+    }
+    LOG.info("Upserted {} rows", numUpserted.get());
+  }
 }