You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/02/27 02:15:04 UTC

[kudu] 07/07: KUDU-1868 Part 2: Eliminate socket read timeout except for negotiation

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 853d3bfd1f5c5c00a069da3b04706e4ef0914b7d
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Mon Feb 4 18:04:50 2019 -0800

    KUDU-1868 Part 2: Eliminate socket read timeout except for negotiation
    
    This removes all use of socket read timeouts from the Java client and
    deprecates all public APIs involving the socket read timeout. The Java
    client does not use socket read timeouts anymore except to time out
    negotiation. This is OK because negotiation is the only activity on the
    socket when negotiation occurs. Once negotiation succeeds, the socket
    read timeout handler is removed from the pipeline, and the timeout tasks
    from Part 1 handle RPC timeouts.
    
    The C++ client does not allow setting the negotiation timeout, and I
    chose not to expose it in the Java client because it didn't seem useful,
    so I set the timeout to 10000ms, which is much larger than the default
    for the C++ client. The C++ client uses 3000ms, which is the server-side
    default. The server-side default is also 3000ms, so I left some room for
    the server-side default to rise. I wrote a test case and checked that
    negotiation did time out when the constant was lowered to a small number
    for the purposes of the test.
    
    Change-Id: I391374dd72b6f4a91a9f69cf34758703afbdc59e
    Reviewed-on: http://gerrit.cloudera.org:8080/12363
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../scala/org/apache/kudu/backup/KuduBackup.scala  |  6 +--
 .../org/apache/kudu/client/AsyncKuduClient.java    | 28 +++++-----
 .../java/org/apache/kudu/client/Connection.java    | 31 +++++------
 .../org/apache/kudu/client/ConnectionCache.java    | 12 ++---
 .../java/org/apache/kudu/client/KuduClient.java    | 11 ++--
 .../test/java/org/apache/kudu/client/ITClient.java | 28 ++--------
 .../java/org/apache/kudu/client/TestTimeouts.java  | 61 ++++++++++++++--------
 .../apache/kudu/mapreduce/CommandLineParser.java   | 13 +++--
 .../org/apache/kudu/spark/kudu/DefaultSource.scala |  7 ++-
 .../org/apache/kudu/spark/kudu/KuduContext.scala   | 22 +++-----
 .../apache/kudu/spark/kudu/KuduReadOptions.scala   |  2 +-
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 20 -------
 12 files changed, 98 insertions(+), 143 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index 5a30f15..3280c0d 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -42,11 +42,7 @@ object KuduBackup {
     val context =
       new KuduContext(
         options.kuduMasterAddresses,
-        session.sparkContext,
-        // TODO: As a workaround for KUDU-1868 the socketReadTimeout is
-        // matched to the scanRequestTimeout. Without this
-        // "Invalid call sequence ID" errors can occur under heavy load.
-        Some(options.scanRequestTimeoutMs)
+        session.sparkContext
       )
     val path = options.path
     log.info(s"Backing up to path: $path")
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index ae2482f..20949a3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -263,7 +263,6 @@ public class AsyncKuduClient implements AutoCloseable {
   public static final byte[] EMPTY_ARRAY = new byte[0];
   public static final long NO_TIMESTAMP = -1;
   public static final long DEFAULT_OPERATION_TIMEOUT_MS = 30000;
-  public static final long DEFAULT_SOCKET_READ_TIMEOUT_MS = 10000;
   private static final long MAX_RPC_ATTEMPTS = 100;
 
   /**
@@ -344,8 +343,6 @@ public class AsyncKuduClient implements AutoCloseable {
 
   private final long defaultAdminOperationTimeoutMs;
 
-  private final long defaultSocketReadTimeoutMs;
-
   private final Statistics statistics;
 
   private final boolean statisticsDisabled;
@@ -367,7 +364,6 @@ public class AsyncKuduClient implements AutoCloseable {
         MASTER_TABLE_NAME_PLACEHOLDER, null, null, 1);
     this.defaultOperationTimeoutMs = b.defaultOperationTimeoutMs;
     this.defaultAdminOperationTimeoutMs = b.defaultAdminOperationTimeoutMs;
-    this.defaultSocketReadTimeoutMs = b.defaultSocketReadTimeoutMs;
     this.statisticsDisabled = b.statisticsDisabled;
     this.statistics = statisticsDisabled ? null : new Statistics();
     this.timer = b.timer;
@@ -375,7 +371,7 @@ public class AsyncKuduClient implements AutoCloseable {
 
     this.securityContext = new SecurityContext();
     this.connectionCache = new ConnectionCache(
-        securityContext, defaultSocketReadTimeoutMs, timer, channelFactory);
+        securityContext, timer, channelFactory);
     this.tokenReacquirer = new AuthnTokenReacquirer(this);
   }
 
@@ -1039,12 +1035,14 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Get the timeout used when waiting to read data from a socket. Will be triggered when nothing
-   * has been read on a socket connected to a tablet server for {@code timeout} milliseconds.
+   * Socket read timeouts are no longer used in the Java client and have no effect.
+   * This method always returns 0, as that previously indicated no socket read timeout.
    * @return a timeout in milliseconds
+   * @deprecated socket read timeouts are no longer used
    */
-  public long getDefaultSocketReadTimeoutMs() {
-    return defaultSocketReadTimeoutMs;
+  @Deprecated public long getDefaultSocketReadTimeoutMs() {
+    LOG.info("getDefaultSocketReadTimeoutMs is deprecated");
+    return 0;
   }
 
   /**
@@ -2440,7 +2438,6 @@ public class AsyncKuduClient implements AutoCloseable {
     private final List<HostAndPort> masterAddresses;
     private long defaultAdminOperationTimeoutMs = DEFAULT_OPERATION_TIMEOUT_MS;
     private long defaultOperationTimeoutMs = DEFAULT_OPERATION_TIMEOUT_MS;
-    private long defaultSocketReadTimeoutMs = DEFAULT_SOCKET_READ_TIMEOUT_MS;
 
     private final HashedWheelTimer timer =
         new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).build(), 20, MILLISECONDS);
@@ -2512,15 +2509,14 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     /**
-     * Sets the default timeout to use when waiting on data from a socket.
-     * Optional.
-     * If not provided, defaults to 10s.
-     * A value of 0 disables the timeout.
+     * Socket read timeouts are no longer used in the Java client and have no effect.
+     * Setting this has no effect.
      * @param timeoutMs a timeout in milliseconds
      * @return this builder
+     * @deprecated this option no longer has any effect
      */
-    public AsyncKuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
-      this.defaultSocketReadTimeoutMs = timeoutMs;
+    @Deprecated public AsyncKuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
+      LOG.info("defaultSocketReadTimeoutMs is deprecated");
       return this;
     }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index 27c5d8f..ba188f0 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -75,8 +75,6 @@ import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
  * Acquiring the monitor on an object of this class will prevent it from
  * accepting write requests as well as buffering requests if the underlying
  * channel isn't connected.
- *
- * TODO(aserbin) clarify on the socketReadTimeoutMs and using per-RPC timeout settings.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -106,9 +104,6 @@ class Connection extends SimpleChannelUpstreamHandler {
   /** Security context to use for connection negotiation. */
   private final SecurityContext securityContext;
 
-  /** Read timeout for the connection (used by Netty's ReadTimeoutHandler). */
-  private final long socketReadTimeoutMs;
-
   /** Timer to monitor read timeouts for the connection (used by Netty's ReadTimeoutHandler). */
   private final HashedWheelTimer timer;
 
@@ -136,6 +131,9 @@ class Connection extends SimpleChannelUpstreamHandler {
       0
   };
 
+  private static final String NEGOTIATION_TIMEOUT_HANDLER = "negotiation-timeout-handler";
+  private static final long NEGOTIATION_TIMEOUT_MS = 10000;
+
   /** Lock to guard access to some of the fields below. */
   private final ReentrantLock lock = new ReentrantLock();
 
@@ -178,7 +176,6 @@ class Connection extends SimpleChannelUpstreamHandler {
    *
    * @param serverInfo the destination server
    * @param securityContext security context to use for connection negotiation
-   * @param socketReadTimeoutMs timeout for the read operations on the socket
    * @param timer timer to set up read timeout on the corresponding Netty channel
    * @param channelFactory Netty factory to create corresponding Netty channel
    * @param credentialsPolicy policy controlling which credentials to use while negotiating on the
@@ -188,14 +185,12 @@ class Connection extends SimpleChannelUpstreamHandler {
    */
   Connection(ServerInfo serverInfo,
              SecurityContext securityContext,
-             long socketReadTimeoutMs,
              HashedWheelTimer timer,
              ClientSocketChannelFactory channelFactory,
              CredentialsPolicy credentialsPolicy) {
     this.serverInfo = serverInfo;
     this.securityContext = securityContext;
     this.state = State.NEW;
-    this.socketReadTimeoutMs = socketReadTimeoutMs;
     this.timer = timer;
     this.credentialsPolicy = credentialsPolicy;
 
@@ -319,6 +314,10 @@ class Connection extends SimpleChannelUpstreamHandler {
         Preconditions.checkState(state == State.NEGOTIATING);
 
         queuedMessages = null;
+
+        // Drop the negotiation timeout handler from the pipeline.
+        ctx.getPipeline().remove(NEGOTIATION_TIMEOUT_HANDLER);
+
         // Set the state to READY -- that means the incoming messages should be no longer put into
         // the queuedMessages, but sent to wire right away (see the enqueueMessage() for details).
         state = State.READY;
@@ -532,11 +531,9 @@ class Connection extends SimpleChannelUpstreamHandler {
       headerBuilder.setCallId(callId);
 
       // Amend the timeout for the call, if necessary.
-      if (socketReadTimeoutMs > 0) {
-        final int timeoutMs = headerBuilder.getTimeoutMillis();
-        if (timeoutMs > 0) {
-          headerBuilder.setTimeoutMillis((int) Math.min(timeoutMs, socketReadTimeoutMs));
-        }
+      final int timeoutMs = headerBuilder.getTimeoutMillis();
+      if (timeoutMs > 0) {
+        headerBuilder.setTimeoutMillis(timeoutMs);
       }
 
       // If the connection hasn't been negotiated yet, add the message into the queuedMessages list.
@@ -801,10 +798,10 @@ class Connection extends SimpleChannelUpstreamHandler {
           4 /* strip the length prefix */));
       super.addLast("decode-inbound", new CallResponse.Decoder());
       super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
-      if (Connection.this.socketReadTimeoutMs > 0) {
-        super.addLast("timeout-handler", new ReadTimeoutHandler(
-            Connection.this.timer, Connection.this.socketReadTimeoutMs, TimeUnit.MILLISECONDS));
-      }
+      // Add a socket read timeout handler to function as a timeout for negotiation.
+      // The handler will be removed once the connection is negotiated.
+      super.addLast(NEGOTIATION_TIMEOUT_HANDLER, new ReadTimeoutHandler(
+          Connection.this.timer, NEGOTIATION_TIMEOUT_MS, TimeUnit.MILLISECONDS));
       super.addLast("kudu-handler", Connection.this);
     }
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 2345b12..85be2cd 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -55,9 +55,6 @@ class ConnectionCache {
   /** Security context to use for connection negotiation. */
   private final SecurityContext securityContext;
 
-  /** Read timeout for connections (used by Netty's ReadTimeoutHandler) */
-  private final long socketReadTimeoutMs;
-
   /** Timer to monitor read timeouts for connections (used by Netty's ReadTimeoutHandler) */
   private final HashedWheelTimer timer;
 
@@ -75,11 +72,9 @@ class ConnectionCache {
 
   /** Create a new empty ConnectionCache given the specified parameters. */
   ConnectionCache(SecurityContext securityContext,
-                  long socketReadTimeoutMs,
                   HashedWheelTimer timer,
                   ClientSocketChannelFactory channelFactory) {
     this.securityContext = securityContext;
-    this.socketReadTimeoutMs = socketReadTimeoutMs;
     this.timer = timer;
     this.channelFactory = channelFactory;
   }
@@ -131,8 +126,11 @@ class ConnectionCache {
         }
       }
       if (result == null) {
-        result = new Connection(serverInfo, securityContext,
-            socketReadTimeoutMs, timer, channelFactory, credentialsPolicy);
+        result = new Connection(serverInfo,
+                                securityContext,
+                                timer,
+                                channelFactory,
+                                credentialsPolicy);
         connections.add(result);
         // There can be at most 2 connections to the same destination: one with primary and another
         // with secondary credentials.
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index 1e14a8f..1ca2bbe 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -468,15 +468,14 @@ public class KuduClient implements AutoCloseable {
     }
 
     /**
-     * Sets the default timeout to use when waiting on data from a socket.
-     * Optional.
-     * If not provided, defaults to 10s.
-     * A value of 0 disables the timeout.
+     * Socket read timeouts are no longer used in the Java client and have no effect.
+     * Setting this has no effect.
      * @param timeoutMs a timeout in milliseconds
      * @return this builder
+     * @deprecated socket read timeouts are no longer used
      */
-    public KuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
-      clientBuilder.defaultSocketReadTimeoutMs(timeoutMs);
+    @Deprecated public KuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
+      LOG.info("defaultSocketReadTimeoutMs is deprecated");
       return this;
     }
 
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index e28c1f2..e03ed4f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -64,8 +64,6 @@ public class ITClient {
   // Latch used to track if an error occurred and we need to stop the test early.
   private final CountDownLatch errorLatch = new CountDownLatch(1);
 
-  private KuduClient localClient;
-  private AsyncKuduClient localAsyncClient;
   private KuduTable table;
   private long runtimeInSeconds;
 
@@ -87,25 +85,9 @@ public class ITClient {
 
     LOG.info ("Test running for {} seconds", runtimeInSeconds);
 
-    // Client we're using has low tolerance for read timeouts but a
-    // higher overall operation timeout.
-    localAsyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(harness.getMasterAddressesAsString())
-        .defaultSocketReadTimeoutMs(500)
-        .build();
-    localClient = new KuduClient(localAsyncClient);
-
     CreateTableOptions builder = new CreateTableOptions().setNumReplicas(3);
     builder.setRangePartitionColumns(ImmutableList.of("key"));
-    table = localClient.createTable(TABLE_NAME, getBasicSchema(), builder);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if (localClient != null) {
-      localClient.shutdown();
-      // No need to explicitly shutdown the async client,
-      // shutting down the sync client effectively does that.
-    }
+    table = harness.getClient().createTable(TABLE_NAME, getBasicSchema(), builder);
   }
 
   @Test(timeout = TEST_TIMEOUT_SECONDS)
@@ -139,7 +121,7 @@ public class ITClient {
       thread.join(DEFAULT_SLEEP);
     }
 
-    AsyncKuduScanner scannerBuilder = localAsyncClient.newScannerBuilder(table).build();
+    AsyncKuduScanner scannerBuilder = harness.getAsyncClient().newScannerBuilder(table).build();
     int rowCount = countRowsInScan(scannerBuilder);
     Assert.assertTrue(rowCount + " should be higher than 0", rowCount > 0);
   }
@@ -199,7 +181,7 @@ public class ITClient {
      */
     private boolean disconnectNode() {
       try {
-        final List<Connection> connections = localAsyncClient.getConnectionListCopy();
+        final List<Connection> connections = harness.getAsyncClient().getConnectionListCopy();
         if (connections.isEmpty()) {
           return true;
         }
@@ -252,7 +234,7 @@ public class ITClient {
    */
   class WriterThread implements Runnable {
 
-    private final KuduSession session = localClient.newSession();
+    private final KuduSession session = harness.getClient().newSession();
     private final Random random = new Random();
     private int currentRowKey = 0;
 
@@ -432,7 +414,7 @@ public class ITClient {
     }
 
     private KuduScanner.KuduScannerBuilder getScannerBuilder() {
-      return localClient.newScannerBuilder(table)
+      return harness.getClient().newScannerBuilder(table)
           .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
           .snapshotTimestampRaw(sharedWriteTimestamp)
           .setFaultTolerant(true);
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
index 1d5d2c8..9b09fde 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.KuduTestHarness.MasterServerConfig;
 import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -94,32 +96,47 @@ public class TestTimeouts {
         .apply(createBasicSchemaInsert(table, 0))
         .hasRowError());
 
-    // Create a new client with no socket read timeout (0 means do not set a read timeout).
-    try (KuduClient noRecvTimeoutClient =
+    // Do a non-scan operation to cache information from the master.
+    client.getTablesList();
+
+    // Scan with a short timeout.
+    KuduScanner scanner = client
+        .newScannerBuilder(table)
+        .scanRequestTimeout(1000)
+        .build();
+
+    // The server will not respond for the lifetime of the test, so we expect the
+    // operation to time out.
+    try {
+      scanner.nextRows();
+      fail("should not have completed nextRows");
+    } catch (NonRecoverableException e) {
+      assertTrue(e.getStatus().isTimedOut());
+    }
+  }
+
+  /**
+   * KUDU-1868: This tests that negotiation can time out on the client side. It passes if the
+   * hardcoded negotiation timeout is lowered to 500ms. In general it is hard to get it to work
+   * right because injecting latency to negotiation server side affects all client connections,
+   * including the harness's Java client, the kudu tool used to create the test cluster, and the
+   * other members of the test cluster. There isn't a way to configure the kudu tool's
+   * negotiation timeout within a Java test, presently.
+   */
+  @Test(timeout = 100000)
+  @Ignore
+  @MasterServerConfig(flags = { "--rpc_negotiation_inject_delay_ms=1000" })
+  public void testClientNegotiationTimeout() throws Exception {
+    // Make a new client so we can turn down the operation timeout-- otherwise this test takes 50s!
+    try (KuduClient lowTimeoutsClient =
              new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
-                 .defaultSocketReadTimeoutMs(0)
+                 .defaultAdminOperationTimeoutMs(5000)
                  .build()) {
-      // Propagate the timestamp to be sure we should see the row that was
-      // inserted by another client.
-      noRecvTimeoutClient.updateLastPropagatedTimestamp(client.getLastPropagatedTimestamp());
-      KuduTable noRecvTimeoutTable = noRecvTimeoutClient.openTable(TABLE_NAME);
-
-      // Do something besides a scan to cache table and tablet lookup.
-      noRecvTimeoutClient.getTablesList();
-
-      // Scan with a short timeout.
-      KuduScanner scanner = noRecvTimeoutClient
-          .newScannerBuilder(noRecvTimeoutTable)
-          .scanRequestTimeout(1000)
-          .build();
-
-      // The server will not respond for the lifetime of the test, so we expect
-      // the operation to time out.
       try {
-        scanner.nextRows();
-        fail("should not have completed nextRows");
+        lowTimeoutsClient.getTablesList();
+        fail("should not have completed getTablesList");
       } catch (NonRecoverableException e) {
-        assertTrue(e.getStatus().isTimedOut());
+        // Good.
       }
     }
   }
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
index ba9d987..6d88b0a 100644
--- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
@@ -47,8 +47,7 @@ public class CommandLineParser {
       AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
   public static final String ADMIN_OPERATION_TIMEOUT_MS_KEY = "kudu.admin.operation.timeout.ms";
   public static final String SOCKET_READ_TIMEOUT_MS_KEY = "kudu.socket.read.timeout.ms";
-  public static final long SOCKET_READ_TIMEOUT_MS_DEFAULT =
-      AsyncKuduClient.DEFAULT_SOCKET_READ_TIMEOUT_MS;
+  public static final long SOCKET_READ_TIMEOUT_MS_DEFAULT = 0;
   public static final String NUM_REPLICAS_KEY = "kudu.num.replicas";
   public static final int NUM_REPLICAS_DEFAULT = 3;
 
@@ -86,11 +85,13 @@ public class CommandLineParser {
   }
 
   /**
-   * Get the configured timeout for socket reads.
+   * Socket read timeouts are no longer used in the Java client.
+   * This method always returns 0, which previously indicated no socket read timeout.
    * @return a long that represents the passed timeout, or the default value
+   * @deprecated socket read timeouts no longer have any effect
    */
-  public long getSocketReadTimeoutMs() {
-    return conf.getLong(SOCKET_READ_TIMEOUT_MS_KEY, SOCKET_READ_TIMEOUT_MS_DEFAULT);
+  @Deprecated public long getSocketReadTimeoutMs() {
+    return 0;
   }
 
   /**
@@ -109,7 +110,6 @@ public class CommandLineParser {
     return new AsyncKuduClient.AsyncKuduClientBuilder(getMasterAddresses())
         .defaultOperationTimeoutMs(getOperationTimeoutMs())
         .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
-        .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
         .build();
   }
 
@@ -121,7 +121,6 @@ public class CommandLineParser {
     KuduClient c = new KuduClient.KuduClientBuilder(getMasterAddresses())
         .defaultOperationTimeoutMs(getOperationTimeoutMs())
         .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
-        .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
         .build();
     KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(c);
     return c;
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 388fac1..47b5ddd 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -175,7 +175,6 @@ class DefaultSource
     val scanLocality =
       parameters.get(SCAN_LOCALITY).map(getScanLocalityType).getOrElse(defaultScanLocality)
     val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
-    val socketReadTimeoutMs = parameters.get(SOCKET_READ_TIMEOUT_MS).map(_.toLong)
     val keepAlivePeriodMs =
       parameters.get(KEEP_ALIVE_PERIOD_MS).map(_.toLong).getOrElse(defaultKeepAlivePeriodMs)
 
@@ -185,7 +184,7 @@ class DefaultSource
       faultTolerantScanner,
       keepAlivePeriodMs,
       scanRequestTimeoutMs,
-      socketReadTimeoutMs)
+      /* socketReadTimeoutMs= */ None)
   }
 
   private def getWriteOptions(parameters: Map[String, String]): KuduWriteOptions = {
@@ -253,7 +252,7 @@ class KuduRelation(
     extends BaseRelation with PrunedFilteredScan with InsertableRelation {
 
   private val context: KuduContext =
-    new KuduContext(masterAddrs, sqlContext.sparkContext, readOptions.socketReadTimeoutMs)
+    new KuduContext(masterAddrs, sqlContext.sparkContext)
 
   private val table: KuduTable = context.syncClient.openTable(tableName)
 
@@ -466,7 +465,7 @@ class KuduSink(
     extends Sink {
 
   private val context: KuduContext =
-    new KuduContext(masterAddrs, sqlContext.sparkContext, readOptions.socketReadTimeoutMs)
+    new KuduContext(masterAddrs, sqlContext.sparkContext)
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     context.writeRows(data, tableName, operationType, writeOptions)
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index f3b190d..4fa59c1 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -133,7 +133,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
   @transient lazy val syncClient: KuduClient = asyncClient.syncClient()
 
   @transient lazy val asyncClient: AsyncKuduClient = {
-    val c = KuduClientCache.getAsyncClient(kuduMaster, socketReadTimeoutMs)
+    val c = KuduClientCache.getAsyncClient(kuduMaster)
     if (authnCredentials != null) {
       c.importAuthenticationCredentials(authnCredentials)
     }
@@ -445,7 +445,6 @@ private object KuduContext {
 private object KuduClientCache {
   val Log: Logger = LoggerFactory.getLogger(KuduClientCache.getClass)
 
-  private case class CacheKey(kuduMaster: String, socketReadTimeoutMs: Option[Long])
   private case class CacheValue(kuduClient: AsyncKuduClient, shutdownHookHandle: Runnable)
 
   /**
@@ -457,7 +456,7 @@ private object KuduClientCache {
    */
   private val ShutdownHookPriority = 100
 
-  private val clientCache = new mutable.HashMap[CacheKey, CacheValue]()
+  private val clientCache = new mutable.HashMap[String, CacheValue]()
 
   // Visible for testing.
   private[kudu] def clearCacheForTests() = {
@@ -476,25 +475,18 @@ private object KuduClientCache {
     clientCache.clear()
   }
 
-  def getAsyncClient(kuduMaster: String, socketReadTimeoutMs: Option[Long]): AsyncKuduClient = {
-    val cacheKey = CacheKey(kuduMaster, socketReadTimeoutMs)
+  def getAsyncClient(kuduMaster: String): AsyncKuduClient = {
     clientCache.synchronized {
-      if (!clientCache.contains(cacheKey)) {
-        val builder = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster)
-        socketReadTimeoutMs match {
-          case Some(timeout) => builder.defaultSocketReadTimeoutMs(timeout)
-          case None =>
-        }
-
-        val asyncClient = builder.build()
+      if (!clientCache.contains(kuduMaster)) {
+        val asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build()
         val hookHandle = new Runnable {
           override def run(): Unit = asyncClient.close()
         }
         ShutdownHookManager.get().addShutdownHook(hookHandle, ShutdownHookPriority)
         val cacheValue = CacheValue(asyncClient, hookHandle)
-        clientCache.put(cacheKey, cacheValue)
+        clientCache.put(kuduMaster, cacheValue)
       }
-      return clientCache(cacheKey).kuduClient
+      return clientCache(kuduMaster).kuduClient
     }
   }
 }
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
index a1983b5..82c3af8 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -34,7 +34,7 @@ import org.apache.kudu.spark.kudu.KuduReadOptions._
  * @param keepAlivePeriodMs The period at which to send keep-alive requests to the tablet
  *                          server to ensure that scanners do not time out
  * @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds
- * @param socketReadTimeoutMs Maximum time allowed when waiting on data from a socket
+ * @param socketReadTimeoutMs This parameter is deprecated and has no effect
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index ad2c110..cbc315e 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -951,24 +951,4 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
     val kuduRelation = kuduRelationFromDataFrame(dataFrame)
     assert(kuduRelation.readOptions.scanRequestTimeoutMs == Some(66666))
   }
-
-  /**
-   * Verify that the kudu.socketReadTimeoutMs parameter is parsed by the
-   * DefaultSource and makes it into the KuduRelation as a configuration
-   * parameter.
-   */
-  @Test
-  def testSocketReadTimeoutPropagation() {
-    kuduOptions = Map(
-      "kudu.table" -> tableName,
-      "kudu.master" -> harness.getMasterAddressesAsString,
-      "kudu.socketReadTimeoutMs" -> "66666")
-    // Even though we're ostensibly just checking for option propagation, the
-    // construction of 'dataFrame' involves a connection to the Kudu cluster,
-    // which is affected by the value of socketReadTimeoutMs. Therefore we must
-    // choose a value that's high enough to actually support establishing a connection.
-    val dataFrame = sqlContext.read.options(kuduOptions).format("kudu").load
-    val kuduRelation = kuduRelationFromDataFrame(dataFrame)
-    assert(kuduRelation.readOptions.socketReadTimeoutMs == Some(66666))
-  }
 }