You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/02/27 02:14:57 UTC

[kudu] branch master updated (877b9f6 -> 853d3bf)

This is an automated email from the ASF dual-hosted git repository.

awong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 877b9f6  KUDU-1868: Part 1: Add timer-based RPC timeouts
     new 06f8c17  make --location_mapping_cmd non-experimental
     new 6302811  KUDU-2710: Fix KeepAliveRequest retries
     new c1330bc  Fix potential duplicate scanner on /scans page
     new e144c14  test: deflake TabletReplicaTest.TestRollLogSegmentSchemaOnAlter
     new 2711b5c  Reduce startup log spam
     new 3698f38  Get rid of some uninteresting INFO log messages
     new 853d3bf  KUDU-1868 Part 2: Eliminate socket read timeout except for negotiation

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../scala/org/apache/kudu/backup/KuduBackup.scala  |  6 +--
 .../org/apache/kudu/client/AsyncKuduClient.java    | 28 +++++-----
 .../org/apache/kudu/client/AsyncKuduScanner.java   |  6 +++
 .../java/org/apache/kudu/client/Connection.java    | 31 +++++------
 .../org/apache/kudu/client/ConnectionCache.java    | 12 ++---
 .../java/org/apache/kudu/client/KuduClient.java    | 11 ++--
 .../main/java/org/apache/kudu/client/RpcProxy.java | 21 ++++++++
 .../test/java/org/apache/kudu/client/ITClient.java | 28 ++--------
 .../org/apache/kudu/client/TestKuduClient.java     | 10 +++-
 .../java/org/apache/kudu/client/TestTimeouts.java  | 61 ++++++++++++++--------
 .../apache/kudu/mapreduce/CommandLineParser.java   | 13 +++--
 .../org/apache/kudu/spark/kudu/DefaultSource.scala |  7 ++-
 .../org/apache/kudu/spark/kudu/KuduContext.scala   | 22 +++-----
 .../apache/kudu/spark/kudu/KuduReadOptions.scala   |  2 +-
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 20 -------
 src/kudu/consensus/consensus_peers.cc              |  2 +-
 src/kudu/master/master_main.cc                     |  3 --
 src/kudu/master/ts_descriptor.cc                   |  1 -
 src/kudu/server/webserver.cc                       | 37 +++++++------
 src/kudu/tablet/tablet_bootstrap.cc                |  5 +-
 src/kudu/tablet/tablet_replica-test.cc             |  3 ++
 src/kudu/tserver/scanners.cc                       | 24 ++++++---
 src/kudu/tserver/tablet_server_main.cc             |  3 --
 src/kudu/tserver/ts_tablet_manager.cc              | 22 +++++---
 src/kudu/util/maintenance_manager.cc               |  2 +-
 25 files changed, 188 insertions(+), 192 deletions(-)


[kudu] 04/07: test: deflake TabletReplicaTest.TestRollLogSegmentSchemaOnAlter

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e144c14813d8df4c7c0477b1eabbcb9be9a9a9e8
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Feb 26 13:06:52 2019 -0800

    test: deflake TabletReplicaTest.TestRollLogSegmentSchemaOnAlter
    
    We should wait for the replica to become leader so it's usable for
    writes. Without this fix, the test failed 81/1000 times in release mode,
    and passed 1000/1000 with it.
    
    Change-Id: Ie6016f717183ca7bcf98c0874a24f1463e113cef
    Reviewed-on: http://gerrit.cloudera.org:8080/12598
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/tablet/tablet_replica-test.cc | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index 4cf9f03..a0960cc 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -233,6 +233,9 @@ class TabletReplicaTest : public KuduTabletTest {
                                      scoped_refptr<ResultTracker>(),
                                      log,
                                      prepare_pool_.get()));
+    // Wait for the replica to be usable.
+    const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+    ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(kTimeout));
   }
 
  protected:


[kudu] 02/07: KUDU-2710: Fix KeepAliveRequest retries

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 6302811eb73efdfd2a3da84c25f5d6589302dee1
Author: Grant Henke <gr...@apache.org>
AuthorDate: Mon Feb 25 21:17:01 2019 -0600

    KUDU-2710: Fix KeepAliveRequest retries
    
    Fixes KeepAliveRequest retries by adding a partitionKey
    implementation. Without this a null partitionKey is passed
    and the client treats this as a master table.
    
    A follow up patch should include fixes to prevent issues
    like this in the future and fix any remaining retry issues.
    This patch is kept small to ensure easy backports.
    
    Change-Id: I951212ab76079e5788c2870223b45782b16509e7
    Reviewed-on: http://gerrit.cloudera.org:8080/12586
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduScanner.java    |  6 ++++++
 .../main/java/org/apache/kudu/client/RpcProxy.java  | 21 +++++++++++++++++++++
 .../java/org/apache/kudu/client/TestKuduClient.java | 10 +++++++++-
 3 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index d31c0a2..15668d1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -863,6 +863,12 @@ public final class AsyncKuduScanner {
     }
 
     @Override
+    public byte[] partitionKey() {
+      // This key is used to lookup where the request needs to go
+      return pruner.nextPartitionKey();
+    }
+
+    @Override
     Pair<Void, Object> deserialize(final CallResponse callResponse,
                                    String tsUUID) throws KuduException {
       ScannerKeepAliveResponsePB.Builder builder = ScannerKeepAliveResponsePB.newBuilder();
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 105ba22..d95a27c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -60,6 +60,9 @@ class RpcProxy {
 
   private static final Logger LOG = LoggerFactory.getLogger(RpcProxy.class);
 
+  private static int staticNumFail = 0;
+  private static Exception staticException = null;
+
   /** The reference to the top-level Kudu client object. */
   @Nonnull
   private final AsyncKuduClient client;
@@ -90,6 +93,18 @@ class RpcProxy {
   }
 
   /**
+   * Fails the next numFail RPCs by throwing the passed exception.
+   * @param numFail the number of RPCs to fail
+   * @param exception the exception to throw when failing an rpc
+   */
+  @InterfaceAudience.LimitedPrivate("Test")
+  static void failNextRpcs(int numFail, Exception exception) {
+    Preconditions.checkNotNull(exception);
+    staticNumFail = numFail;
+    staticException = exception;
+  }
+
+  /**
    * Send the specified RPC using the connection to the Kudu server.
    *
    * @param <R> type of the RPC
@@ -101,6 +116,12 @@ class RpcProxy {
                           final Connection connection,
                           final KuduRpc<R> rpc) {
     try {
+      // Throw an exception to enable testing failures. See `failNextRpcs`.
+      if (staticNumFail > 0) {
+        staticNumFail--;
+        LOG.warn("Forcing a failure on sendRpc: " + rpc);
+        throw staticException;
+      }
       if (!rpc.getRequiredFeatures().isEmpty()) {
         // An extra optimization: when the peer's features are already known, check that the server
         // supports feature flags, if those are required.
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 303f53a..576ec88 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -47,6 +47,7 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -55,10 +56,11 @@ import java.util.concurrent.Future;
 import com.google.common.collect.ImmutableList;
 import com.stumbleupon.async.Deferred;
 
+import org.apache.kudu.test.ClientTestUtil;
 import org.apache.kudu.test.KuduTestHarness;
 import org.apache.kudu.test.KuduTestHarness.LocationConfig;
 import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
-import org.apache.kudu.test.ClientTestUtil;
+import org.apache.kudu.test.RandomUtils;
 import org.apache.kudu.util.TimestampUtil;
 import org.junit.Before;
 import org.junit.Rule;
@@ -286,8 +288,14 @@ public class TestKuduClient {
     // Wait for longer than the scanner ttl calling keepAlive throughout.
     // Each loop sleeps 25% of the scanner ttl and we loop 10 times to ensure
     // we extend over 2x the scanner ttl.
+    Random random = RandomUtils.getRandom();
     for (int i = 0; i < 10; i++) {
       Thread.sleep(SHORT_SCANNER_TTL_MS / 4);
+      // Force 1/3 of the keepAlive requests to retry up to 3 times.
+      if (i % 3 == 0) {
+        RpcProxy.failNextRpcs(random.nextInt(4),
+            new RecoverableException(Status.ServiceUnavailable("testKeepAlive")));
+      }
       scanner.keepAlive();
     }
 


[kudu] 06/07: Get rid of some uninteresting INFO log messages

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 3698f387c46372eb47ef4d2d4f7c48dacc2e39c3
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Tue Feb 26 15:52:59 2019 -0800

    Get rid of some uninteresting INFO log messages
    
    When a tablet shuts down because it's being deleted, it unregisters all
    of its maintenance ops. We log one line for each op. Let's not do that
    (except at verbose level).
    
    Similarly, whenever a leader loses leadership it closes its tracking of
    peers. We logged one line per peer each time this happened, even though
    it is a banal part of relinquishing leadership. Let's not do that either
    (except at verbose level).
    
    Change-Id: I3293c0ddc96140423900f2cf4466285b0e62d418
    Reviewed-on: http://gerrit.cloudera.org:8080/12611
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/consensus/consensus_peers.cc | 2 +-
 src/kudu/util/maintenance_manager.cc  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index 8d6b5e7..e3a30be 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -479,7 +479,7 @@ void Peer::Close() {
     if (closed_) return;
     closed_ = true;
   }
-  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Closing peer: " << peer_pb_.permanent_uuid();
+  VLOG_WITH_PREFIX_UNLOCKED(1) << "Closing peer: " << peer_pb_.permanent_uuid();
 
   queue_->UntrackPeer(peer_pb_.permanent_uuid());
 }
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index d7689de..ea607aa 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -236,7 +236,7 @@ void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
     }
     ops_.erase(iter);
   }
-  LOG_WITH_PREFIX(INFO) << "Unregistered op " << op->name();
+  VLOG_WITH_PREFIX(1) << "Unregistered op " << op->name();
   op->cond_.reset();
   // Remove the op's shared_ptr reference to us. This might 'delete this'.
   op->manager_.reset();


[kudu] 07/07: KUDU-1868 Part 2: Eliminate socket read timeout except for negotiation

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 853d3bfd1f5c5c00a069da3b04706e4ef0914b7d
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Mon Feb 4 18:04:50 2019 -0800

    KUDU-1868 Part 2: Eliminate socket read timeout except for negotiation
    
    This removes all use of socket read timeouts from the Java client and
    deprecates all public APIs involving the socket read timeout. The Java
    client does not use socket read timeouts anymore except to time out
    negotiation. This is OK because negotiation is the only activity on the
    socket when negotiation occurs. Once negotiation succeeds, the socket
    read timeout handler is removed from the pipeline, and the timeout tasks
    from Part 1 handle RPC timeouts.
    
    The C++ client does not allow setting the negotiation timeout, and I
    chose not to expose it in the Java client because it didn't seem useful,
    so I set the timeout to 10000ms, which is much larger than the default
    for the C++ client. The C++ client uses 3000ms, which is the server-side
    default. The server-side default is also 3000ms, so I left some room for
    the server-side default to rise. I wrote a test case and checked that
    negotiation did time out when the constant was lowered to a small number
    for the purposes of the test.
    
    Change-Id: I391374dd72b6f4a91a9f69cf34758703afbdc59e
    Reviewed-on: http://gerrit.cloudera.org:8080/12363
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../scala/org/apache/kudu/backup/KuduBackup.scala  |  6 +--
 .../org/apache/kudu/client/AsyncKuduClient.java    | 28 +++++-----
 .../java/org/apache/kudu/client/Connection.java    | 31 +++++------
 .../org/apache/kudu/client/ConnectionCache.java    | 12 ++---
 .../java/org/apache/kudu/client/KuduClient.java    | 11 ++--
 .../test/java/org/apache/kudu/client/ITClient.java | 28 ++--------
 .../java/org/apache/kudu/client/TestTimeouts.java  | 61 ++++++++++++++--------
 .../apache/kudu/mapreduce/CommandLineParser.java   | 13 +++--
 .../org/apache/kudu/spark/kudu/DefaultSource.scala |  7 ++-
 .../org/apache/kudu/spark/kudu/KuduContext.scala   | 22 +++-----
 .../apache/kudu/spark/kudu/KuduReadOptions.scala   |  2 +-
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 20 -------
 12 files changed, 98 insertions(+), 143 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index 5a30f15..3280c0d 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -42,11 +42,7 @@ object KuduBackup {
     val context =
       new KuduContext(
         options.kuduMasterAddresses,
-        session.sparkContext,
-        // TODO: As a workaround for KUDU-1868 the socketReadTimeout is
-        // matched to the scanRequestTimeout. Without this
-        // "Invalid call sequence ID" errors can occur under heavy load.
-        Some(options.scanRequestTimeoutMs)
+        session.sparkContext
       )
     val path = options.path
     log.info(s"Backing up to path: $path")
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index ae2482f..20949a3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -263,7 +263,6 @@ public class AsyncKuduClient implements AutoCloseable {
   public static final byte[] EMPTY_ARRAY = new byte[0];
   public static final long NO_TIMESTAMP = -1;
   public static final long DEFAULT_OPERATION_TIMEOUT_MS = 30000;
-  public static final long DEFAULT_SOCKET_READ_TIMEOUT_MS = 10000;
   private static final long MAX_RPC_ATTEMPTS = 100;
 
   /**
@@ -344,8 +343,6 @@ public class AsyncKuduClient implements AutoCloseable {
 
   private final long defaultAdminOperationTimeoutMs;
 
-  private final long defaultSocketReadTimeoutMs;
-
   private final Statistics statistics;
 
   private final boolean statisticsDisabled;
@@ -367,7 +364,6 @@ public class AsyncKuduClient implements AutoCloseable {
         MASTER_TABLE_NAME_PLACEHOLDER, null, null, 1);
     this.defaultOperationTimeoutMs = b.defaultOperationTimeoutMs;
     this.defaultAdminOperationTimeoutMs = b.defaultAdminOperationTimeoutMs;
-    this.defaultSocketReadTimeoutMs = b.defaultSocketReadTimeoutMs;
     this.statisticsDisabled = b.statisticsDisabled;
     this.statistics = statisticsDisabled ? null : new Statistics();
     this.timer = b.timer;
@@ -375,7 +371,7 @@ public class AsyncKuduClient implements AutoCloseable {
 
     this.securityContext = new SecurityContext();
     this.connectionCache = new ConnectionCache(
-        securityContext, defaultSocketReadTimeoutMs, timer, channelFactory);
+        securityContext, timer, channelFactory);
     this.tokenReacquirer = new AuthnTokenReacquirer(this);
   }
 
@@ -1039,12 +1035,14 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Get the timeout used when waiting to read data from a socket. Will be triggered when nothing
-   * has been read on a socket connected to a tablet server for {@code timeout} milliseconds.
+   * Socket read timeouts are no longer used in the Java client and have no effect.
+   * This method always returns 0, as that previously indicated no socket read timeout.
    * @return a timeout in milliseconds
+   * @deprecated socket read timeouts are no longer used
    */
-  public long getDefaultSocketReadTimeoutMs() {
-    return defaultSocketReadTimeoutMs;
+  @Deprecated public long getDefaultSocketReadTimeoutMs() {
+    LOG.info("getDefaultSocketReadTimeoutMs is deprecated");
+    return 0;
   }
 
   /**
@@ -2440,7 +2438,6 @@ public class AsyncKuduClient implements AutoCloseable {
     private final List<HostAndPort> masterAddresses;
     private long defaultAdminOperationTimeoutMs = DEFAULT_OPERATION_TIMEOUT_MS;
     private long defaultOperationTimeoutMs = DEFAULT_OPERATION_TIMEOUT_MS;
-    private long defaultSocketReadTimeoutMs = DEFAULT_SOCKET_READ_TIMEOUT_MS;
 
     private final HashedWheelTimer timer =
         new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).build(), 20, MILLISECONDS);
@@ -2512,15 +2509,14 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     /**
-     * Sets the default timeout to use when waiting on data from a socket.
-     * Optional.
-     * If not provided, defaults to 10s.
-     * A value of 0 disables the timeout.
+     * Socket read timeouts are no longer used in the Java client and have no effect.
+     * Setting this has no effect.
      * @param timeoutMs a timeout in milliseconds
      * @return this builder
+     * @deprecated this option no longer has any effect
      */
-    public AsyncKuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
-      this.defaultSocketReadTimeoutMs = timeoutMs;
+    @Deprecated public AsyncKuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
+      LOG.info("defaultSocketReadTimeoutMs is deprecated");
       return this;
     }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index 27c5d8f..ba188f0 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -75,8 +75,6 @@ import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
  * Acquiring the monitor on an object of this class will prevent it from
  * accepting write requests as well as buffering requests if the underlying
  * channel isn't connected.
- *
- * TODO(aserbin) clarify on the socketReadTimeoutMs and using per-RPC timeout settings.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -106,9 +104,6 @@ class Connection extends SimpleChannelUpstreamHandler {
   /** Security context to use for connection negotiation. */
   private final SecurityContext securityContext;
 
-  /** Read timeout for the connection (used by Netty's ReadTimeoutHandler). */
-  private final long socketReadTimeoutMs;
-
   /** Timer to monitor read timeouts for the connection (used by Netty's ReadTimeoutHandler). */
   private final HashedWheelTimer timer;
 
@@ -136,6 +131,9 @@ class Connection extends SimpleChannelUpstreamHandler {
       0
   };
 
+  private static final String NEGOTIATION_TIMEOUT_HANDLER = "negotiation-timeout-handler";
+  private static final long NEGOTIATION_TIMEOUT_MS = 10000;
+
   /** Lock to guard access to some of the fields below. */
   private final ReentrantLock lock = new ReentrantLock();
 
@@ -178,7 +176,6 @@ class Connection extends SimpleChannelUpstreamHandler {
    *
    * @param serverInfo the destination server
    * @param securityContext security context to use for connection negotiation
-   * @param socketReadTimeoutMs timeout for the read operations on the socket
    * @param timer timer to set up read timeout on the corresponding Netty channel
    * @param channelFactory Netty factory to create corresponding Netty channel
    * @param credentialsPolicy policy controlling which credentials to use while negotiating on the
@@ -188,14 +185,12 @@ class Connection extends SimpleChannelUpstreamHandler {
    */
   Connection(ServerInfo serverInfo,
              SecurityContext securityContext,
-             long socketReadTimeoutMs,
              HashedWheelTimer timer,
              ClientSocketChannelFactory channelFactory,
              CredentialsPolicy credentialsPolicy) {
     this.serverInfo = serverInfo;
     this.securityContext = securityContext;
     this.state = State.NEW;
-    this.socketReadTimeoutMs = socketReadTimeoutMs;
     this.timer = timer;
     this.credentialsPolicy = credentialsPolicy;
 
@@ -319,6 +314,10 @@ class Connection extends SimpleChannelUpstreamHandler {
         Preconditions.checkState(state == State.NEGOTIATING);
 
         queuedMessages = null;
+
+        // Drop the negotiation timeout handler from the pipeline.
+        ctx.getPipeline().remove(NEGOTIATION_TIMEOUT_HANDLER);
+
         // Set the state to READY -- that means the incoming messages should be no longer put into
         // the queuedMessages, but sent to wire right away (see the enqueueMessage() for details).
         state = State.READY;
@@ -532,11 +531,9 @@ class Connection extends SimpleChannelUpstreamHandler {
       headerBuilder.setCallId(callId);
 
       // Amend the timeout for the call, if necessary.
-      if (socketReadTimeoutMs > 0) {
-        final int timeoutMs = headerBuilder.getTimeoutMillis();
-        if (timeoutMs > 0) {
-          headerBuilder.setTimeoutMillis((int) Math.min(timeoutMs, socketReadTimeoutMs));
-        }
+      final int timeoutMs = headerBuilder.getTimeoutMillis();
+      if (timeoutMs > 0) {
+        headerBuilder.setTimeoutMillis(timeoutMs);
       }
 
       // If the connection hasn't been negotiated yet, add the message into the queuedMessages list.
@@ -801,10 +798,10 @@ class Connection extends SimpleChannelUpstreamHandler {
           4 /* strip the length prefix */));
       super.addLast("decode-inbound", new CallResponse.Decoder());
       super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
-      if (Connection.this.socketReadTimeoutMs > 0) {
-        super.addLast("timeout-handler", new ReadTimeoutHandler(
-            Connection.this.timer, Connection.this.socketReadTimeoutMs, TimeUnit.MILLISECONDS));
-      }
+      // Add a socket read timeout handler to function as a timeout for negotiation.
+      // The handler will be removed once the connection is negotiated.
+      super.addLast(NEGOTIATION_TIMEOUT_HANDLER, new ReadTimeoutHandler(
+          Connection.this.timer, NEGOTIATION_TIMEOUT_MS, TimeUnit.MILLISECONDS));
       super.addLast("kudu-handler", Connection.this);
     }
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 2345b12..85be2cd 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -55,9 +55,6 @@ class ConnectionCache {
   /** Security context to use for connection negotiation. */
   private final SecurityContext securityContext;
 
-  /** Read timeout for connections (used by Netty's ReadTimeoutHandler) */
-  private final long socketReadTimeoutMs;
-
   /** Timer to monitor read timeouts for connections (used by Netty's ReadTimeoutHandler) */
   private final HashedWheelTimer timer;
 
@@ -75,11 +72,9 @@ class ConnectionCache {
 
   /** Create a new empty ConnectionCache given the specified parameters. */
   ConnectionCache(SecurityContext securityContext,
-                  long socketReadTimeoutMs,
                   HashedWheelTimer timer,
                   ClientSocketChannelFactory channelFactory) {
     this.securityContext = securityContext;
-    this.socketReadTimeoutMs = socketReadTimeoutMs;
     this.timer = timer;
     this.channelFactory = channelFactory;
   }
@@ -131,8 +126,11 @@ class ConnectionCache {
         }
       }
       if (result == null) {
-        result = new Connection(serverInfo, securityContext,
-            socketReadTimeoutMs, timer, channelFactory, credentialsPolicy);
+        result = new Connection(serverInfo,
+                                securityContext,
+                                timer,
+                                channelFactory,
+                                credentialsPolicy);
         connections.add(result);
         // There can be at most 2 connections to the same destination: one with primary and another
         // with secondary credentials.
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index 1e14a8f..1ca2bbe 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -468,15 +468,14 @@ public class KuduClient implements AutoCloseable {
     }
 
     /**
-     * Sets the default timeout to use when waiting on data from a socket.
-     * Optional.
-     * If not provided, defaults to 10s.
-     * A value of 0 disables the timeout.
+     * Socket read timeouts are no longer used in the Java client and have no effect.
+     * Setting this has no effect.
      * @param timeoutMs a timeout in milliseconds
      * @return this builder
+     * @deprecated socket read timeouts are no longer used
      */
-    public KuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
-      clientBuilder.defaultSocketReadTimeoutMs(timeoutMs);
+    @Deprecated public KuduClientBuilder defaultSocketReadTimeoutMs(long timeoutMs) {
+      LOG.info("defaultSocketReadTimeoutMs is deprecated");
       return this;
     }
 
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index e28c1f2..e03ed4f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -64,8 +64,6 @@ public class ITClient {
   // Latch used to track if an error occurred and we need to stop the test early.
   private final CountDownLatch errorLatch = new CountDownLatch(1);
 
-  private KuduClient localClient;
-  private AsyncKuduClient localAsyncClient;
   private KuduTable table;
   private long runtimeInSeconds;
 
@@ -87,25 +85,9 @@ public class ITClient {
 
     LOG.info ("Test running for {} seconds", runtimeInSeconds);
 
-    // Client we're using has low tolerance for read timeouts but a
-    // higher overall operation timeout.
-    localAsyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(harness.getMasterAddressesAsString())
-        .defaultSocketReadTimeoutMs(500)
-        .build();
-    localClient = new KuduClient(localAsyncClient);
-
     CreateTableOptions builder = new CreateTableOptions().setNumReplicas(3);
     builder.setRangePartitionColumns(ImmutableList.of("key"));
-    table = localClient.createTable(TABLE_NAME, getBasicSchema(), builder);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if (localClient != null) {
-      localClient.shutdown();
-      // No need to explicitly shutdown the async client,
-      // shutting down the sync client effectively does that.
-    }
+    table = harness.getClient().createTable(TABLE_NAME, getBasicSchema(), builder);
   }
 
   @Test(timeout = TEST_TIMEOUT_SECONDS)
@@ -139,7 +121,7 @@ public class ITClient {
       thread.join(DEFAULT_SLEEP);
     }
 
-    AsyncKuduScanner scannerBuilder = localAsyncClient.newScannerBuilder(table).build();
+    AsyncKuduScanner scannerBuilder = harness.getAsyncClient().newScannerBuilder(table).build();
     int rowCount = countRowsInScan(scannerBuilder);
     Assert.assertTrue(rowCount + " should be higher than 0", rowCount > 0);
   }
@@ -199,7 +181,7 @@ public class ITClient {
      */
     private boolean disconnectNode() {
       try {
-        final List<Connection> connections = localAsyncClient.getConnectionListCopy();
+        final List<Connection> connections = harness.getAsyncClient().getConnectionListCopy();
         if (connections.isEmpty()) {
           return true;
         }
@@ -252,7 +234,7 @@ public class ITClient {
    */
   class WriterThread implements Runnable {
 
-    private final KuduSession session = localClient.newSession();
+    private final KuduSession session = harness.getClient().newSession();
     private final Random random = new Random();
     private int currentRowKey = 0;
 
@@ -432,7 +414,7 @@ public class ITClient {
     }
 
     private KuduScanner.KuduScannerBuilder getScannerBuilder() {
-      return localClient.newScannerBuilder(table)
+      return harness.getClient().newScannerBuilder(table)
           .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
           .snapshotTimestampRaw(sharedWriteTimestamp)
           .setFaultTolerant(true);
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
index 1d5d2c8..9b09fde 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.KuduTestHarness.MasterServerConfig;
 import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -94,32 +96,47 @@ public class TestTimeouts {
         .apply(createBasicSchemaInsert(table, 0))
         .hasRowError());
 
-    // Create a new client with no socket read timeout (0 means do not set a read timeout).
-    try (KuduClient noRecvTimeoutClient =
+    // Do a non-scan operation to cache information from the master.
+    client.getTablesList();
+
+    // Scan with a short timeout.
+    KuduScanner scanner = client
+        .newScannerBuilder(table)
+        .scanRequestTimeout(1000)
+        .build();
+
+    // The server will not respond for the lifetime of the test, so we expect the
+    // operation to time out.
+    try {
+      scanner.nextRows();
+      fail("should not have completed nextRows");
+    } catch (NonRecoverableException e) {
+      assertTrue(e.getStatus().isTimedOut());
+    }
+  }
+
+  /**
+   * KUDU-1868: This tests that negotiation can time out on the client side. It passes if the
+   * hardcoded negotiation timeout is lowered to 500ms. In general it is hard to get it to work
+   * right because injecting latency to negotiation server side affects all client connections,
+   * including the harness's Java client, the kudu tool used to create the test cluster, and the
+   * other members of the test cluster. There isn't a way to configure the kudu tool's
+   * negotiation timeout within a Java test, presently.
+   */
+  @Test(timeout = 100000)
+  @Ignore
+  @MasterServerConfig(flags = { "--rpc_negotiation_inject_delay_ms=1000" })
+  public void testClientNegotiationTimeout() throws Exception {
+    // Make a new client so we can turn down the operation timeout-- otherwise this test takes 50s!
+    try (KuduClient lowTimeoutsClient =
              new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
-                 .defaultSocketReadTimeoutMs(0)
+                 .defaultAdminOperationTimeoutMs(5000)
                  .build()) {
-      // Propagate the timestamp to be sure we should see the row that was
-      // inserted by another client.
-      noRecvTimeoutClient.updateLastPropagatedTimestamp(client.getLastPropagatedTimestamp());
-      KuduTable noRecvTimeoutTable = noRecvTimeoutClient.openTable(TABLE_NAME);
-
-      // Do something besides a scan to cache table and tablet lookup.
-      noRecvTimeoutClient.getTablesList();
-
-      // Scan with a short timeout.
-      KuduScanner scanner = noRecvTimeoutClient
-          .newScannerBuilder(noRecvTimeoutTable)
-          .scanRequestTimeout(1000)
-          .build();
-
-      // The server will not respond for the lifetime of the test, so we expect
-      // the operation to time out.
       try {
-        scanner.nextRows();
-        fail("should not have completed nextRows");
+        lowTimeoutsClient.getTablesList();
+        fail("should not have completed getTablesList");
       } catch (NonRecoverableException e) {
-        assertTrue(e.getStatus().isTimedOut());
+        // Good.
       }
     }
   }
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
index ba9d987..6d88b0a 100644
--- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java
@@ -47,8 +47,7 @@ public class CommandLineParser {
       AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
   public static final String ADMIN_OPERATION_TIMEOUT_MS_KEY = "kudu.admin.operation.timeout.ms";
   public static final String SOCKET_READ_TIMEOUT_MS_KEY = "kudu.socket.read.timeout.ms";
-  public static final long SOCKET_READ_TIMEOUT_MS_DEFAULT =
-      AsyncKuduClient.DEFAULT_SOCKET_READ_TIMEOUT_MS;
+  public static final long SOCKET_READ_TIMEOUT_MS_DEFAULT = 0;
   public static final String NUM_REPLICAS_KEY = "kudu.num.replicas";
   public static final int NUM_REPLICAS_DEFAULT = 3;
 
@@ -86,11 +85,13 @@ public class CommandLineParser {
   }
 
   /**
-   * Get the configured timeout for socket reads.
+   * Socket read timeouts are no longer used in the Java client.
+   * This method always returns 0, which previously indicated no socket read timeout.
    * @return a long that represents the passed timeout, or the default value
+   * @deprecated socket read timeouts no longer have any effect
    */
-  public long getSocketReadTimeoutMs() {
-    return conf.getLong(SOCKET_READ_TIMEOUT_MS_KEY, SOCKET_READ_TIMEOUT_MS_DEFAULT);
+  @Deprecated public long getSocketReadTimeoutMs() {
+    return 0;
   }
 
   /**
@@ -109,7 +110,6 @@ public class CommandLineParser {
     return new AsyncKuduClient.AsyncKuduClientBuilder(getMasterAddresses())
         .defaultOperationTimeoutMs(getOperationTimeoutMs())
         .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
-        .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
         .build();
   }
 
@@ -121,7 +121,6 @@ public class CommandLineParser {
     KuduClient c = new KuduClient.KuduClientBuilder(getMasterAddresses())
         .defaultOperationTimeoutMs(getOperationTimeoutMs())
         .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs())
-        .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs())
         .build();
     KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(c);
     return c;
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 388fac1..47b5ddd 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -175,7 +175,6 @@ class DefaultSource
     val scanLocality =
       parameters.get(SCAN_LOCALITY).map(getScanLocalityType).getOrElse(defaultScanLocality)
     val scanRequestTimeoutMs = parameters.get(SCAN_REQUEST_TIMEOUT_MS).map(_.toLong)
-    val socketReadTimeoutMs = parameters.get(SOCKET_READ_TIMEOUT_MS).map(_.toLong)
     val keepAlivePeriodMs =
       parameters.get(KEEP_ALIVE_PERIOD_MS).map(_.toLong).getOrElse(defaultKeepAlivePeriodMs)
 
@@ -185,7 +184,7 @@ class DefaultSource
       faultTolerantScanner,
       keepAlivePeriodMs,
       scanRequestTimeoutMs,
-      socketReadTimeoutMs)
+      /* socketReadTimeoutMs= */ None)
   }
 
   private def getWriteOptions(parameters: Map[String, String]): KuduWriteOptions = {
@@ -253,7 +252,7 @@ class KuduRelation(
     extends BaseRelation with PrunedFilteredScan with InsertableRelation {
 
   private val context: KuduContext =
-    new KuduContext(masterAddrs, sqlContext.sparkContext, readOptions.socketReadTimeoutMs)
+    new KuduContext(masterAddrs, sqlContext.sparkContext)
 
   private val table: KuduTable = context.syncClient.openTable(tableName)
 
@@ -466,7 +465,7 @@ class KuduSink(
     extends Sink {
 
   private val context: KuduContext =
-    new KuduContext(masterAddrs, sqlContext.sparkContext, readOptions.socketReadTimeoutMs)
+    new KuduContext(masterAddrs, sqlContext.sparkContext)
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     context.writeRows(data, tableName, operationType, writeOptions)
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index f3b190d..4fa59c1 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -133,7 +133,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
   @transient lazy val syncClient: KuduClient = asyncClient.syncClient()
 
   @transient lazy val asyncClient: AsyncKuduClient = {
-    val c = KuduClientCache.getAsyncClient(kuduMaster, socketReadTimeoutMs)
+    val c = KuduClientCache.getAsyncClient(kuduMaster)
     if (authnCredentials != null) {
       c.importAuthenticationCredentials(authnCredentials)
     }
@@ -445,7 +445,6 @@ private object KuduContext {
 private object KuduClientCache {
   val Log: Logger = LoggerFactory.getLogger(KuduClientCache.getClass)
 
-  private case class CacheKey(kuduMaster: String, socketReadTimeoutMs: Option[Long])
   private case class CacheValue(kuduClient: AsyncKuduClient, shutdownHookHandle: Runnable)
 
   /**
@@ -457,7 +456,7 @@ private object KuduClientCache {
    */
   private val ShutdownHookPriority = 100
 
-  private val clientCache = new mutable.HashMap[CacheKey, CacheValue]()
+  private val clientCache = new mutable.HashMap[String, CacheValue]()
 
   // Visible for testing.
   private[kudu] def clearCacheForTests() = {
@@ -476,25 +475,18 @@ private object KuduClientCache {
     clientCache.clear()
   }
 
-  def getAsyncClient(kuduMaster: String, socketReadTimeoutMs: Option[Long]): AsyncKuduClient = {
-    val cacheKey = CacheKey(kuduMaster, socketReadTimeoutMs)
+  def getAsyncClient(kuduMaster: String): AsyncKuduClient = {
     clientCache.synchronized {
-      if (!clientCache.contains(cacheKey)) {
-        val builder = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster)
-        socketReadTimeoutMs match {
-          case Some(timeout) => builder.defaultSocketReadTimeoutMs(timeout)
-          case None =>
-        }
-
-        val asyncClient = builder.build()
+      if (!clientCache.contains(kuduMaster)) {
+        val asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build()
         val hookHandle = new Runnable {
           override def run(): Unit = asyncClient.close()
         }
         ShutdownHookManager.get().addShutdownHook(hookHandle, ShutdownHookPriority)
         val cacheValue = CacheValue(asyncClient, hookHandle)
-        clientCache.put(cacheKey, cacheValue)
+        clientCache.put(kuduMaster, cacheValue)
       }
-      return clientCache(cacheKey).kuduClient
+      return clientCache(kuduMaster).kuduClient
     }
   }
 }
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
index a1983b5..82c3af8 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -34,7 +34,7 @@ import org.apache.kudu.spark.kudu.KuduReadOptions._
  * @param keepAlivePeriodMs The period at which to send keep-alive requests to the tablet
  *                          server to ensure that scanners do not time out
  * @param scanRequestTimeoutMs Maximum time allowed per scan request, in milliseconds
- * @param socketReadTimeoutMs Maximum time allowed when waiting on data from a socket
+ * @param socketReadTimeoutMs This parameter is deprecated and has no effect
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index ad2c110..cbc315e 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -951,24 +951,4 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
     val kuduRelation = kuduRelationFromDataFrame(dataFrame)
     assert(kuduRelation.readOptions.scanRequestTimeoutMs == Some(66666))
   }
-
-  /**
-   * Verify that the kudu.socketReadTimeoutMs parameter is parsed by the
-   * DefaultSource and makes it into the KuduRelation as a configuration
-   * parameter.
-   */
-  @Test
-  def testSocketReadTimeoutPropagation() {
-    kuduOptions = Map(
-      "kudu.table" -> tableName,
-      "kudu.master" -> harness.getMasterAddressesAsString,
-      "kudu.socketReadTimeoutMs" -> "66666")
-    // Even though we're ostensibly just checking for option propagation, the
-    // construction of 'dataFrame' involves a connection to the Kudu cluster,
-    // which is affected by the value of socketReadTimeoutMs. Therefore we must
-    // choose a value that's high enough to actually support establishing a connection.
-    val dataFrame = sqlContext.read.options(kuduOptions).format("kudu").load
-    val kuduRelation = kuduRelationFromDataFrame(dataFrame)
-    assert(kuduRelation.readOptions.socketReadTimeoutMs == Some(66666))
-  }
 }


[kudu] 03/07: Fix potential duplicate scanner on /scans page

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c1330bc2f71cd2e84ac16fdcdc4373841afc7bdb
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Tue Feb 26 13:15:08 2019 -0800

    Fix potential duplicate scanner on /scans page
    
    In rare scenarios, it is possible to see the same scanner listed twice
    on the /scans page. This can happen because we gather the list in two
    stages, first getting a list of in-flight scanners, and then getting a
    list of recently completed scanners. Each is done under a lock and so
    gives a consistent snapshot of in-flight or completed scanners,
    respectively, but if a scanner completes between gathering the list of
    in-flight scanners and gathering the list of completed scanners, it's
    possible to see that same scanner listed as both in-flight and also as
    completed.
    
    This patch changes the logic a bit so we deduplicate the list of
    scanners by scanner id, favoring keeping the information about a
    complete scanner since it must be more up-to-date than information
    about the scanner while it was in-flight.
    
    Change-Id: I680546d80315a1548337a504c45afa4f2f0350ad
    Reviewed-on: http://gerrit.cloudera.org:8080/12600
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Will Berkeley <wd...@gmail.com>
---
 src/kudu/tserver/scanners.cc | 24 ++++++++++++++++--------
 1 file changed, 16 insertions(+), 8 deletions(-)

diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index e1562e8..3c6342c 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -66,6 +66,7 @@ METRIC_DEFINE_gauge_size(server, active_scanners,
 
 using std::string;
 using std::unique_ptr;
+using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
@@ -221,31 +222,38 @@ void ScannerManager::ListScanners(std::vector<SharedScanner>* scanners) const {
 }
 
 vector<ScanDescriptor> ScannerManager::ListScans() const {
-  vector<ScanDescriptor> scans;
+  unordered_map<string, ScanDescriptor> scans;
   for (const ScannerMapStripe* stripe : scanner_maps_) {
     shared_lock<RWMutex> l(stripe->lock_);
     for (const auto& se : stripe->scanners_by_id_) {
       if (se.second->IsInitialized()) {
-        scans.emplace_back(se.second->descriptor());
-        scans.back().state = ScanState::kActive;
+        ScanDescriptor desc = se.second->descriptor();
+        desc.state = ScanState::kActive;
+        EmplaceOrDie(&scans, se.first, std::move(desc));
       }
     }
   }
 
   {
     shared_lock<RWMutex> l(completed_scans_lock_);
-    scans.insert(scans.end(), completed_scans_.begin(), completed_scans_.end());
+    // A scanner in 'scans' may have completed between the above loop and here.
+    // As we'd rather have the finalized descriptor of the completed scan,
+    // update over the old descriptor in this case.
+    for (const auto& scan : completed_scans_) {
+      InsertOrUpdate(&scans, scan.scanner_id, scan);
+    }
   }
 
-  // TODO(dan): It's possible for a descriptor to be included twice in the
-  // result set if its scanner is concurrently removed from the scanner map.
+  vector<ScanDescriptor> ret;
+  ret.reserve(scans.size());
+  AppendValuesFromMap(scans, &ret);
 
   // Sort oldest to newest, so that the ordering is consistent across calls.
-  std::sort(scans.begin(), scans.end(), [] (const ScanDescriptor& a, const ScanDescriptor& b) {
+  std::sort(ret.begin(), ret.end(), [] (const ScanDescriptor& a, const ScanDescriptor& b) {
       return a.start_time > b.start_time;
   });
 
-  return scans;
+  return ret;
 }
 
 void ScannerManager::RemoveExpiredScanners() {


[kudu] 05/07: Reduce startup log spam

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 2711b5c02b4d46c055cbf734ade99e8837f361a1
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Wed Feb 20 15:21:21 2019 -0800

    Reduce startup log spam
    
    This eliminates a lot of the messages at startup that really aren't
    useful:
    
    1. No more of this:
    
    I0220 13:18:51.160027 372970944 ts_tablet_manager.cc:1007] T 2f7ecbdb5c564106a2646ffe5f21f347 P 0a909baebce949a6aa4cdc0f196ecd00: Loading tablet metadata
    I0220 13:18:51.161830 372970944 ts_tablet_manager.cc:1007] T 27705b0198da406d8301830cd942c7ee P 0a909baebce949a6aa4cdc0f196ecd00: Loading tablet metadata
    I0220 13:18:51.162523 372970944 ts_tablet_manager.cc:1007] T 58f1243dad6346549370fbe50a739563 P 0a909baebce949a6aa4cdc0f196ecd00: Loading tablet metadata
    I0220 13:18:51.163156 372970944 ts_tablet_manager.cc:1007] T c4fc7d7a16e34f4aa87e1af2e1defa2e P 0a909baebce949a6aa4cdc0f196ecd00: Loading tablet metadata
    I0220 13:18:51.163739 372970944 ts_tablet_manager.cc:1007] T 28d12a741054442d988646178daad91f P 0a909baebce949a6aa4cdc0f196ecd00: Loading tablet metadata
    I0220 13:18:51.164413 372970944 ts_tablet_manager.cc:1007] T 0048470f677242dda0277aed2fef7b05 P 0a909baebce949a6aa4cdc0f196ecd00: Loading tablet metadata
    I0220 13:18:51.165114 372970944 ts_tablet_manager.cc:1007] T 2b9dc70391f54e3fbf116b46d86c2c50 P 0a909baebce949a6aa4cdc0f196ecd00: Loading tablet metadata
    ...
    
    Instead, just a periodic progress message and a summary message:
    
    I0220 15:15:29.755679 323278272 ts_tablet_manager.cc:344] Loading tablet metadata (0/60 complete)
    I0220 15:15:29.803109 323278272 ts_tablet_manager.cc:356] Loaded tablet metadata (60 total tablets, 60 live tablets)
    
    2. No more of this:
    
    I0220 13:18:51.200745 372970944 ts_tablet_manager.cc:1175] T 27705b0198da406d8301830cd942c7ee P 0a909baebce949a6aa4cdc0f196ecd00: Registered tablet (data state: TABLET_DATA_READY)
    I0220 13:18:51.201200 372970944 ts_tablet_manager.cc:1175] T 58f1243dad6346549370fbe50a739563 P 0a909baebce949a6aa4cdc0f196ecd00: Registered tablet (data state: TABLET_DATA_READY)
    I0220 13:18:51.201637 372970944 ts_tablet_manager.cc:1175] T c4fc7d7a16e34f4aa87e1af2e1defa2e P 0a909baebce949a6aa4cdc0f196ecd00: Registered tablet (data state: TABLET_DATA_READY)
    I0220 13:18:51.202126 372970944 ts_tablet_manager.cc:1175] T 28d12a741054442d988646178daad91f P 0a909baebce949a6aa4cdc0f196ecd00: Registered tablet (data state: TABLET_DATA_READY)
    I0220 13:18:51.202700 372970944 ts_tablet_manager.cc:1175] T 0048470f677242dda0277aed2fef7b05 P 0a909baebce949a6aa4cdc0f196ecd00: Registered tablet (data state: TABLET_DATA_READY)
    I0220 13:18:51.203141 372970944 ts_tablet_manager.cc:1175] T 2b9dc70391f54e3fbf116b46d86c2c50 P 0a909baebce949a6aa4cdc0f196ecd00: Registered tablet (data state: TABLET_DATA_READY)
    ...
    
    Instead, just a periodic progress message and a summary message:
    
    I0220 15:15:29.803143 323278272 ts_tablet_manager.cc:362] Registering tablets (0/60 complete)
    I0220 15:15:29.822479 323278272 ts_tablet_manager.cc:376] Registered 60 tablets
    
    3. Condense webserver startup messages. Instead of
    
    I0220 13:18:51.234637 372970944 webserver.cc:175] Starting webserver on 127.0.0.1:8050
    I0220 13:18:51.234654 372970944 webserver.cc:180] Document root: /Users/wdberkeley/src/kudu/www
    I0220 13:18:51.235159 372970944 webserver.cc:313] Webserver started. Bound to: http://127.0.0.1:8050/
    
    just
    
    I0220 15:15:29.834751 323278272 webserver.cc:307] Webserver started at http://127.0.0.1:8050/ using document root /Users/wdberkeley/src/kudu/www and password file <none>
    
    Note that the protocol 'http://' will now correctly show as 'https://'
    if the webserver is configured for SSL.
    
    4. I got rid of some of the highest level, lowest info content messages
       about the "tablet server starting", because that just means we
       finished executed code in tablet_server_main.cc, not that the tablet
       server is ready to do anything in particular.
    
    Change-Id: I3793a2385612cf920a94e5f62a559c350b8bf461
    Reviewed-on: http://gerrit.cloudera.org:8080/12540
    Tested-by: Will Berkeley <wd...@gmail.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/master/master_main.cc         |  3 ---
 src/kudu/server/webserver.cc           | 37 +++++++++++++++++-----------------
 src/kudu/tablet/tablet_bootstrap.cc    |  5 +----
 src/kudu/tserver/tablet_server_main.cc |  3 ---
 src/kudu/tserver/ts_tablet_manager.cc  | 22 ++++++++++++--------
 5 files changed, 33 insertions(+), 37 deletions(-)

diff --git a/src/kudu/master/master_main.cc b/src/kudu/master/master_main.cc
index 1abb837..583a185 100644
--- a/src/kudu/master/master_main.cc
+++ b/src/kudu/master/master_main.cc
@@ -77,13 +77,10 @@ static int MasterMain(int argc, char** argv) {
 
   MasterOptions opts;
   Master server(opts);
-  LOG(INFO) << "Initializing master server...";
   CHECK_OK(server.Init());
 
-  LOG(INFO) << "Starting Master server...";
   CHECK_OK(server.Start());
 
-  LOG(INFO) << "Master server successfully started.";
   while (true) {
     SleepFor(MonoDelta::FromSeconds(60));
   }
diff --git a/src/kudu/server/webserver.cc b/src/kudu/server/webserver.cc
index d0fde6b..304d3e8 100644
--- a/src/kudu/server/webserver.cc
+++ b/src/kudu/server/webserver.cc
@@ -162,9 +162,10 @@ Status Webserver::BuildListenSpec(string* spec) const {
   RETURN_NOT_OK(ParseAddressList(http_address_, 80, &addrs));
 
   vector<string> parts;
+  parts.reserve(addrs.size());
   for (const Sockaddr& addr : addrs) {
-    // Mongoose makes sockets with 's' suffixes accept SSL traffic only
-    parts.push_back(addr.ToString() + (IsSecure() ? "s" : ""));
+    // Mongoose makes sockets with 's' suffixes accept SSL traffic only.
+    parts.emplace_back(addr.ToString() + (IsSecure() ? "s" : ""));
   }
 
   JoinStrings(parts, ",", spec);
@@ -172,25 +173,17 @@ Status Webserver::BuildListenSpec(string* spec) const {
 }
 
 Status Webserver::Start() {
-  LOG(INFO) << "Starting webserver on " << http_address_;
-
   vector<string> options;
-
   if (static_pages_available()) {
-    LOG(INFO) << "Document root: " << opts_.doc_root;
     options.emplace_back("document_root");
     options.push_back(opts_.doc_root);
     options.emplace_back("enable_directory_listing");
     options.emplace_back("no");
-  } else {
-    LOG(INFO)<< "Document root disabled";
   }
 
   if (IsSecure()) {
-    LOG(INFO) << "Webserver: Enabling HTTPS support";
-
-    // Initialize OpenSSL, and prevent Squeasel from also performing global OpenSSL
-    // initialization.
+    // Initialize OpenSSL, and prevent Squeasel from also performing global
+    // OpenSSL initialization.
     security::InitializeOpenSSL();
     options.emplace_back("ssl_global_init");
     options.emplace_back("false");
@@ -208,7 +201,7 @@ Status Webserver::Start() {
                                                             &key_password));
       }
       options.emplace_back("ssl_private_key_password");
-      options.push_back(key_password); // maybe empty if not configured.
+      options.push_back(key_password); // May be empty if not configured.
     }
 
     options.emplace_back("ssl_ciphers");
@@ -223,14 +216,13 @@ Status Webserver::Start() {
   }
 
   if (!opts_.password_file.empty()) {
-    // Mongoose doesn't log anything if it can't stat the password file (but will if it
-    // can't open it, which it tries to do during a request)
+    // Mongoose doesn't log anything if it can't stat the password file (but
+    // will if it can't open it, which it tries to do during a request).
     if (!Env::Default()->FileExists(opts_.password_file)) {
       ostringstream ss;
       ss << "Webserver: Password file does not exist: " << opts_.password_file;
       return Status::InvalidArgument(ss.str());
     }
-    LOG(INFO) << "Webserver: Password file is " << opts_.password_file;
     options.emplace_back("global_auth_file");
     options.push_back(opts_.password_file);
   }
@@ -298,7 +290,8 @@ Status Webserver::Start() {
       std::bind<void>(std::mem_fn(&Webserver::RootHandler),
                       this, std::placeholders::_1, std::placeholders::_2);
 
-  RegisterPathHandler("/", "Home", default_callback, true /* styled */, true /* on_nav_bar */);
+  RegisterPathHandler("/", "Home", default_callback,
+                      /*is_styled=*/true, /*is_on_nav_bar=*/true);
 
   vector<Sockaddr> addrs;
   RETURN_NOT_OK(GetBoundAddresses(&addrs));
@@ -307,10 +300,16 @@ Status Webserver::Start() {
     if (!bound_addresses_str.empty()) {
       bound_addresses_str += ", ";
     }
-    bound_addresses_str += "http://" + addr.ToString() + "/";
+    bound_addresses_str += Substitute("$0$1/",
+                                      IsSecure() ? "https://" : "http://",
+                                      addr.ToString());
   }
 
-  LOG(INFO) << "Webserver started. Bound to: " << bound_addresses_str;
+  LOG(INFO) << Substitute(
+      "Webserver started at $0 using document root $1 and password file $2",
+      bound_addresses_str,
+      static_pages_available() ? opts_.doc_root : "<none>",
+      opts_.password_file.empty() ? "<none>" : opts_.password_file);
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 2b03540..d37157e 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -436,9 +436,7 @@ class TabletBootstrap {
 };
 
 void TabletBootstrap::SetStatusMessage(const string& status) {
-  LOG(INFO) << "T " << tablet_meta_->tablet_id()
-            << " P " << tablet_meta_->fs_manager()->uuid() << ": "
-            << status;
+  LOG_WITH_PREFIX(INFO) << status;
   if (tablet_replica_) tablet_replica_->SetStatusMessage(status);
 }
 
@@ -567,7 +565,6 @@ Status TabletBootstrap::RunBootstrap(shared_ptr<Tablet>* rebuilt_tablet,
     VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << SecureDebugString(super_block);
   }
 
-
   // Ensure the tablet's data dirs are present and healthy before it is opened.
   DataDirGroupPB data_dir_group;
   RETURN_NOT_OK_PREPEND(
diff --git a/src/kudu/tserver/tablet_server_main.cc b/src/kudu/tserver/tablet_server_main.cc
index 17989be..7c7ca60 100644
--- a/src/kudu/tserver/tablet_server_main.cc
+++ b/src/kudu/tserver/tablet_server_main.cc
@@ -82,15 +82,12 @@ static int TabletServerMain(int argc, char** argv) {
 
   TabletServerOptions opts;
   TabletServer server(opts);
-  LOG(INFO) << "Initializing tablet server...";
   CHECK_OK(server.Init());
 
   MAYBE_FAULT(FLAGS_fault_before_start);
 
-  LOG(INFO) << "Starting tablet server...";
   CHECK_OK(server.Start());
 
-  LOG(INFO) << "Tablet server successfully started.";
   while (true) {
     SleepFor(MonoDelta::FromSeconds(60));
   }
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index b365ba4..2dc868e 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -334,7 +334,7 @@ Status TSTabletManager::Init() {
 
   InitLocalRaftPeerPB();
 
-  vector<scoped_refptr<TabletMetadata> > metas;
+  vector<scoped_refptr<TabletMetadata>> metas;
 
   // First, load all of the tablet metadata. We do this before we start
   // submitting the actual OpenTablet() tasks so that we don't have to compete
@@ -347,16 +347,20 @@ Status TSTabletManager::Init() {
     RETURN_NOT_OK_PREPEND(OpenTabletMeta(tablet_id, &meta),
                           "Failed to open tablet metadata for tablet: " + tablet_id);
     loaded_count++;
-    if (PREDICT_FALSE(meta->tablet_data_state() != TABLET_DATA_READY)) {
+    if (meta->tablet_data_state() != TABLET_DATA_READY) {
       RETURN_NOT_OK(HandleNonReadyTabletOnStartup(meta));
       continue;
     }
     metas.push_back(meta);
   }
-  LOG(INFO) << Substitute("Loaded tablet metadata ($0 live tablets)", metas.size());
+  LOG(INFO) << Substitute("Loaded tablet metadata ($0 total tablets, $1 live tablets)",
+                          loaded_count, metas.size());
 
   // Now submit the "Open" task for each.
-  for (const scoped_refptr<TabletMetadata>& meta : metas) {
+  int registered_count = 0;
+  for (const auto& meta : metas) {
+    KLOG_EVERY_N_SECS(INFO, 1) << Substitute("Registering tablets ($0/$1 complete)",
+                                             registered_count, metas.size());
     scoped_refptr<TransitionInProgressDeleter> deleter;
     {
       std::lock_guard<RWMutex> lock(lock_);
@@ -367,7 +371,9 @@ Status TSTabletManager::Init() {
     RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &replica));
     RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
                                                             this, replica, deleter)));
+    registered_count++;
   }
+  LOG(INFO) << Substitute("Registered $0 tablets", registered_count);
 
   {
     std::lock_guard<RWMutex> lock(lock_);
@@ -1004,7 +1010,7 @@ Status TSTabletManager::StartTabletStateTransitionUnlocked(
 
 Status TSTabletManager::OpenTabletMeta(const string& tablet_id,
                                        scoped_refptr<TabletMetadata>* metadata) {
-  LOG(INFO) << LogPrefix(tablet_id) << "Loading tablet metadata";
+  VLOG(1) << LogPrefix(tablet_id) << "Loading tablet metadata";
   TRACE("Loading metadata...");
   scoped_refptr<TabletMetadata> meta;
   RETURN_NOT_OK_PREPEND(TabletMetadata::Load(fs_manager_, tablet_id, &meta),
@@ -1027,7 +1033,7 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletReplica>& replica,
   shared_ptr<Tablet> tablet;
   scoped_refptr<Log> log;
 
-  LOG(INFO) << LogPrefix(tablet_id) << "Bootstrapping tablet";
+  VLOG(1) << LogPrefix(tablet_id) << "Bootstrapping tablet";
   TRACE("Bootstrapping tablet");
 
   scoped_refptr<ConsensusMetadata> cmeta;
@@ -1172,8 +1178,8 @@ void TSTabletManager::RegisterTablet(const std::string& tablet_id,
   }
 
   TabletDataState data_state = replica->tablet_metadata()->tablet_data_state();
-  LOG(INFO) << LogPrefix(tablet_id) << Substitute("Registered tablet (data state: $0)",
-                                                  TabletDataState_Name(data_state));
+  VLOG(1) << LogPrefix(tablet_id) << Substitute("Registered tablet (data state: $0)",
+                                                TabletDataState_Name(data_state));
 }
 
 bool TSTabletManager::LookupTablet(const string& tablet_id,


[kudu] 01/07: make --location_mapping_cmd non-experimental

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 06f8c1750148334bc893f2dd8c6f9042094d09bf
Author: Andrew Wong <aw...@apache.org>
AuthorDate: Mon Feb 25 22:46:02 2019 -0800

    make --location_mapping_cmd non-experimental
    
    "experimental" implies that this is not a supported feature, but it is.
    "evolving", the default, seems more appropriate.
    
    Change-Id: I5d377f9027a167c8577f8b7c262c3ba2c65e2fc9
    Reviewed-on: http://gerrit.cloudera.org:8080/12594
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
    (cherry picked from commit 42a68d1f6539ed341024d19f22081d7eda0116d7)
    Reviewed-on: http://gerrit.cloudera.org:8080/12603
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/master/ts_descriptor.cc | 1 -
 1 file changed, 1 deletion(-)

diff --git a/src/kudu/master/ts_descriptor.cc b/src/kudu/master/ts_descriptor.cc
index 321b2fd..619f927 100644
--- a/src/kudu/master/ts_descriptor.cc
+++ b/src/kudu/master/ts_descriptor.cc
@@ -58,7 +58,6 @@ DEFINE_string(location_mapping_cmd, "",
               "and consists of /-separated tokens each of which contains only "
               "characters from the set [a-zA-Z0-9_-.]. If the cluster is not "
               "using location awareness features this flag should not be set.");
-TAG_FLAG(location_mapping_cmd, experimental);
 
 DEFINE_bool(location_mapping_by_uuid, false,
             "Whether the location command is given tablet server identifier "