You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/11/21 23:26:47 UTC

[kudu] branch master updated (0eb0a60 -> c050809)

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 0eb0a60  log: refactor close and replace last segment
     new b58b37a  KUDU-2162 Expose stats about scan filters
     new c050809  [examples] use org.apache.kudu packages v1.11.1

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 examples/java/collectl/README.adoc                 |  2 +-
 examples/java/collectl/pom.xml                     |  6 +-
 examples/java/insert-loadgen/pom.xml               |  6 +-
 examples/java/java-example/pom.xml                 |  2 +-
 examples/quickstart/nifi/README.adoc               |  4 +-
 examples/quickstart/spark/README.adoc              |  2 +-
 examples/scala/spark-example/pom.xml               |  5 +-
 .../org/apache/kudu/client/AsyncKuduScanner.java   | 27 ++++++-
 .../java/org/apache/kudu/client/KuduScanner.java   |  8 ++
 .../org/apache/kudu/client/ResourceMetrics.java    | 87 ++++++++++++++++++++++
 .../apache/kudu/client/TestScannerMultiTablet.java | 22 ++++++
 src/kudu/client/client-test.cc                     | 11 +++
 src/kudu/rpc/inbound_call.cc                       |  8 +-
 src/kudu/rpc/inbound_call.h                        | 11 +++
 src/kudu/rpc/rpc_context.cc                        |  6 +-
 src/kudu/rpc/rpc_context.h                         |  5 +-
 src/kudu/tserver/scanners.cc                       |  5 ++
 src/kudu/tserver/scanners.h                        | 11 ++-
 src/kudu/tserver/tablet_service.cc                 | 40 ++++++++--
 src/kudu/tserver/tserver.proto                     | 15 +++-
 20 files changed, 261 insertions(+), 22 deletions(-)
 create mode 100644 java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java


[kudu] 01/02: KUDU-2162 Expose stats about scan filters

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit b58b37a1ffe359f8237e1ff7a834080cab111339
Author: Thomas D'Silva <td...@apache.org>
AuthorDate: Mon Oct 16 15:37:59 2017 -0700

    KUDU-2162 Expose stats about scan filters
    
    This patch adds the following resource metrics to scanners.
    - bytes read, from disk or cache
    - scan rpc wait and total duration scanner was open in nanoseconds
    - cpu time and system time in nanoseconds
    These metrics can be used to roughly compare the amount of work done by
    scan operations, and could be useful for runtime optimizations in query
    planners like Impala or Spark.
    
    Change-Id: Id30a7e82357fe2fc28f6d316378a612af43d8c96
    Reviewed-on: http://gerrit.cloudera.org:8080/8375
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Adar Dembo <ad...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduScanner.java   | 27 ++++++-
 .../java/org/apache/kudu/client/KuduScanner.java   |  8 ++
 .../org/apache/kudu/client/ResourceMetrics.java    | 87 ++++++++++++++++++++++
 .../apache/kudu/client/TestScannerMultiTablet.java | 22 ++++++
 src/kudu/client/client-test.cc                     | 11 +++
 src/kudu/rpc/inbound_call.cc                       |  8 +-
 src/kudu/rpc/inbound_call.h                        | 11 +++
 src/kudu/rpc/rpc_context.cc                        |  6 +-
 src/kudu/rpc/rpc_context.h                         |  5 +-
 src/kudu/tserver/scanners.cc                       |  5 ++
 src/kudu/tserver/scanners.h                        | 11 ++-
 src/kudu/tserver/tablet_service.cc                 | 40 ++++++++--
 src/kudu/tserver/tserver.proto                     | 15 +++-
 13 files changed, 242 insertions(+), 14 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 9628bd2..e194bf0 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -29,6 +29,7 @@ package org.apache.kudu.client;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.kudu.tserver.Tserver.NewScanRequestPB;
+import static org.apache.kudu.tserver.Tserver.ResourceMetricsPB;
 import static org.apache.kudu.tserver.Tserver.ScanRequestPB;
 import static org.apache.kudu.tserver.Tserver.ScanResponsePB;
 import static org.apache.kudu.tserver.Tserver.TabletServerErrorPB;
@@ -221,6 +222,8 @@ public final class AsyncKuduScanner {
 
   private boolean reuseRowResult = false;
 
+  private final ResourceMetrics resourceMetrics = new ResourceMetrics();
+
   private boolean closed = false;
 
   private boolean hasMore = true;
@@ -452,6 +455,15 @@ public final class AsyncKuduScanner {
     return this.startTimestamp;
   }
 
+  /**
+   * Returns the {@code ResourceMetrics} for this scanner. These metrics are
+   * updated with each batch of rows returned from the server.
+   * @return the resource metrics for this scanner
+   */
+  public ResourceMetrics getResourceMetrics() {
+    return this.resourceMetrics;
+  }
+
   long getSnapshotTimestamp() {
     return this.htTimestamp;
   }
@@ -520,6 +532,9 @@ public final class AsyncKuduScanner {
           }
 
           numRowsReturned += resp.data.getNumRows();
+          if (resp.resourceMetricsPb != null) {
+            resourceMetrics.update(resp.resourceMetricsPb);
+          }
 
           if (!resp.more || resp.scannerId == null) {
             scanFinished();
@@ -561,7 +576,7 @@ public final class AsyncKuduScanner {
           } else {
             LOG.debug("Can not open scanner", e);
             // Don't let the scanner think it's opened on this tablet.
-            return Deferred.fromError(e); // Let the error propogate.
+            return Deferred.fromError(e); // Let the error propagate.
           }
         }
 
@@ -877,18 +892,22 @@ public final class AsyncKuduScanner {
 
     private final byte[] lastPrimaryKey;
 
+    private final ResourceMetricsPB resourceMetricsPb;
+
     Response(final byte[] scannerId,
              final RowResultIterator data,
              final boolean more,
              final long scanTimestamp,
              final long propagatedTimestamp,
-             final byte[] lastPrimaryKey) {
+             final byte[] lastPrimaryKey,
+             final ResourceMetricsPB resourceMetricsPb) {
       this.scannerId = scannerId;
       this.data = data;
       this.more = more;
       this.scanTimestamp = scanTimestamp;
       this.propagatedTimestamp = propagatedTimestamp;
       this.lastPrimaryKey = lastPrimaryKey;
+      this.resourceMetricsPb = resourceMetricsPb;
     }
 
     @Override
@@ -1131,12 +1150,14 @@ public final class AsyncKuduScanner {
             Bytes.pretty(scannerId));
         throw new NonRecoverableException(statusIllegalState);
       }
+      ResourceMetricsPB resourceMetricsPB = resp.hasResourceMetrics() ?
+          resp.getResourceMetrics() : null;
       Response response = new Response(id, iterator, hasMore,
           resp.hasSnapTimestamp() ? resp.getSnapTimestamp()
                                   : AsyncKuduClient.NO_TIMESTAMP,
           resp.hasPropagatedTimestamp() ? resp.getPropagatedTimestamp()
                                         : AsyncKuduClient.NO_TIMESTAMP,
-          resp.getLastPrimaryKey().toByteArray());
+          resp.getLastPrimaryKey().toByteArray(), resourceMetricsPB);
       if (LOG.isDebugEnabled()) {
         LOG.debug("{} for scanner {}", response.toString(), AsyncKuduScanner.this);
       }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 0db7f3b..8e5505c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -154,6 +154,14 @@ public class KuduScanner implements Iterable<RowResult> {
   }
 
   /**
+   * Returns the resource metrics of this scanner.
+   * @return the resource metrics for this scanner
+   */
+  public ResourceMetrics getResourceMetrics() {
+    return asyncScanner.getResourceMetrics();
+  }
+
+  /**
    * Returns the RemoteTablet currently being scanned, if any.
    */
   @InterfaceAudience.LimitedPrivate("Test")
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java
new file mode 100644
index 0000000..d9a3282
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ResourceMetrics.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+
+import org.apache.kudu.tserver.Tserver.ResourceMetricsPB;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * A container for scanner resource metrics.
+ * <p>
+ * This class wraps a mapping from metric name to metric value for server-side
+ * metrics associated with a scanner.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ResourceMetrics {
+  private Map<String, LongAdder> metrics = new ConcurrentHashMap<>();
+
+  /**
+   * Returns a copy of this ResourceMetrics's underlying map of metric name to
+   * metric value.
+   * @return a map of metric name to metric value
+   */
+  public Map<String, Long> get() {
+      return metrics.entrySet().stream()
+              .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().sum()));
+  }
+
+  /**
+   * Returns the value of the metric named by 'name', or 0 if there is no such metric.
+   * @param name the name of the metric to get the value for
+   * @return the value of the named metric; if the metric is not found, returns 0
+   */
+  public long getMetric(String name) {
+      return metrics.getOrDefault(name, new LongAdder()).sum();
+  }
+
+  /**
+   * Increment this instance's metric values with those found in 'resourceMetricsPb'.
+   * @param resourceMetricsPb resource metrics protobuf object to be used to update this object
+   */
+  void update(ResourceMetricsPB resourceMetricsPb) {
+    Preconditions.checkNotNull(resourceMetricsPb);
+    for (Map.Entry<FieldDescriptor, Object> entry : resourceMetricsPb.getAllFields().entrySet()) {
+      FieldDescriptor field = entry.getKey();
+      if (field.getJavaType() == JavaType.LONG) {
+        increment(field.getName(), (Long) entry.getValue());
+      }
+    }
+  }
+
+  /**
+   * Increment the metric value by the specific amount.
+   * @param name the name of the metric whose value is to be incremented
+   * @param amount the amount to increment the value by
+   */
+  private void increment(String name, long amount) {
+    metrics.computeIfAbsent(name, k -> new LongAdder()).add(amount);
+  }
+}
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
index 9d742b2..ccdc965 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -106,6 +107,27 @@ public class TestScannerMultiTablet {
     asyncClient = harness.getAsyncClient();
   }
 
+  private void validateResourceMetrics(ResourceMetrics resourceMetrics) {
+      assertTrue("queue_duration_nanos > 0",
+              resourceMetrics.getMetric("queue_duration_nanos") > 0L);
+      assertTrue("total_duration_nanos > 0",
+              resourceMetrics.getMetric("total_duration_nanos") > 0L);
+  }
+
+  // Test scanner resource metrics.
+  @Test(timeout = 100000)
+  public void testResourceMetrics() throws Exception {
+    // Scan one tablet and the whole table.
+    AsyncKuduScanner oneTabletScanner = getScanner("1", "1", "1", "4"); // Whole second tablet.
+    assertEquals(3, countRowsInScan(oneTabletScanner));
+    AsyncKuduScanner fullTableScanner = getScanner(null, null, null, null);
+    assertEquals(9, countRowsInScan(fullTableScanner));
+    // Both scans should take a positive amount of wait duration, total duration, cpu user and cpu
+    // system time
+    validateResourceMetrics(oneTabletScanner.getResourceMetrics());
+    validateResourceMetrics(fullTableScanner.getResourceMetrics());
+  }
+
   // Test various combinations of start/end row keys.
   @Test(timeout = 100000)
   public void testKeyStartEnd() throws Exception {
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 7a66110..d020987 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -426,6 +426,7 @@ class ClientTest : public KuduTest {
     return del;
   }
 
+
   void DoTestScanResourceMetrics() {
     KuduScanner scanner(client_table_.get());
     string tablet_id = GetFirstTabletId(client_table_.get());
@@ -443,6 +444,16 @@ class ClientTest : public KuduTest {
       ASSERT_TRUE(ContainsKey(metrics, "cfile_cache_miss_bytes"));
       ASSERT_TRUE(ContainsKey(metrics, "cfile_cache_hit_bytes"));
       ASSERT_GT(metrics["cfile_cache_miss_bytes"] + metrics["cfile_cache_hit_bytes"], 0);
+
+      ASSERT_TRUE(ContainsKey(metrics, "total_duration_nanos"));
+      ASSERT_GT(metrics["total_duration_nanos"], 0);
+      ASSERT_TRUE(ContainsKey(metrics, "queue_duration_nanos"));
+      ASSERT_GT(metrics["queue_duration_nanos"], 0);
+      ASSERT_TRUE(ContainsKey(metrics, "cpu_user_nanos"));
+      ASSERT_TRUE(ContainsKey(metrics, "cpu_system_nanos"));
+
+      ASSERT_TRUE(ContainsKey(metrics, "bytes_read"));
+      ASSERT_GT(metrics["bytes_read"], 0);
     }
   }
 
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 655c453..e6c33e6 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -302,7 +302,7 @@ void InboundCall::RecordHandlingCompleted() {
 
   if (method_info_) {
     method_info_->handler_latency_histogram->Increment(
-        (timing_.time_completed - timing_.time_handled).ToMicroseconds());
+        (timing_.ProcessingDuration()).ToMicroseconds());
   }
 }
 
@@ -311,9 +311,15 @@ bool InboundCall::ClientTimedOut() const {
 }
 
 MonoTime InboundCall::GetTimeReceived() const {
+  DCHECK(timing_.time_received.Initialized());
   return timing_.time_received;
 }
 
+MonoTime InboundCall::GetTimeHandled() const {
+  DCHECK(timing_.time_handled.Initialized());
+  return timing_.time_handled;
+}
+
 vector<uint32_t> InboundCall::GetRequiredFeatures() const {
   vector<uint32_t> features;
   for (uint32_t feature : header_.required_feature_flags()) {
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 07c57dc..2edab8b 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -67,6 +67,14 @@ struct InboundCallTiming {
   MonoDelta TotalDuration() const {
     return time_completed - time_received;
   }
+
+  MonoDelta ProcessingDuration() const {
+    return time_completed - time_handled;
+  }
+
+  MonoDelta QueueDuration() const {
+    return time_handled - time_received;
+  }
 };
 
 // Inbound call on server
@@ -192,6 +200,9 @@ class InboundCall {
   // Return the time when this call was received.
   MonoTime GetTimeReceived() const;
 
+  // Return the time when this call was handled.
+  MonoTime GetTimeHandled() const;
+
   // Returns the set of application-specific feature flags required to service
   // the RPC.
   std::vector<uint32_t> GetRequiredFeatures() const;
diff --git a/src/kudu/rpc/rpc_context.cc b/src/kudu/rpc/rpc_context.cc
index 9f95358..a65c137 100644
--- a/src/kudu/rpc/rpc_context.cc
+++ b/src/kudu/rpc/rpc_context.cc
@@ -192,7 +192,11 @@ MonoTime RpcContext::GetTimeReceived() const {
   return call_->GetTimeReceived();
 }
 
-Trace* RpcContext::trace() {
+MonoTime RpcContext::GetTimeHandled() const {
+  return call_->GetTimeHandled();
+}
+
+Trace* RpcContext::trace() const {
   return call_->trace();
 }
 
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index 38ce703..b6ecaf6 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -81,7 +81,7 @@ class RpcContext {
   void SetResultTracker(scoped_refptr<ResultTracker> result_tracker);
 
   // Return the trace buffer for this call.
-  Trace* trace();
+  Trace* trace() const;
 
   // Send a response to the call. The service may call this method
   // before or after returning from the original handler method,
@@ -213,6 +213,9 @@ class RpcContext {
   // Return the time when the inbound call was received.
   MonoTime GetTimeReceived() const;
 
+  // Return the time when the inbound call was handled.
+  MonoTime GetTimeHandled() const;
+
   // Whether the results of this RPC are tracked with a ResultTracker.
   // If this returns true, both result_tracker() and request_id() should return non-null results.
   bool AreResultsTracked() const { return result_tracker_.get() != nullptr; }
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index 2b1aa7b..77b851b 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -427,5 +427,10 @@ ScanDescriptor Scanner::descriptor() const {
   return descriptor;
 }
 
+CpuTimes Scanner::cpu_times() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return cpu_times_;
+}
+
 } // namespace tserver
 } // namespace kudu
diff --git a/src/kudu/tserver/scanners.h b/src/kudu/tserver/scanners.h
index b014912..f03e8ff 100644
--- a/src/kudu/tserver/scanners.h
+++ b/src/kudu/tserver/scanners.h
@@ -325,6 +325,8 @@ class Scanner {
 
   ScanDescriptor descriptor() const;
 
+  CpuTimes cpu_times() const;
+
  private:
   friend class ScannerManager;
 
@@ -438,9 +440,12 @@ struct ScanDescriptor {
 class ScopedAddScannerTiming {
  public:
   // 'scanner' must outlive the scoped object.
-  explicit ScopedAddScannerTiming(Scanner* scanner)
+  // object pointed to by 'cpu_times' will contain the cpu timing information of the scanner upon
+  // scope exit
+  explicit ScopedAddScannerTiming(Scanner* scanner, CpuTimes* cpu_times)
       : stopped_(false),
-        scanner_(scanner) {
+        scanner_(scanner),
+        cpu_times_(cpu_times) {
     sw_.start();
   }
 
@@ -456,10 +461,12 @@ class ScopedAddScannerTiming {
     sw_.stop();
     scanner_->AddTimings(sw_.elapsed());
     scanner_->UpdateAccessTime();
+    *cpu_times_ = scanner_->cpu_times();
   }
 
   bool stopped_;
   Scanner* scanner_;
+  CpuTimes* cpu_times_;
   Stopwatch sw_;
 
   DISALLOW_COPY_AND_ASSIGN(ScopedAddScannerTiming);
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index c0277af..2839d42 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -63,6 +63,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/inbound_call.h"
 #include "kudu/rpc/remote_user.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_header.pb.h"
@@ -105,6 +106,7 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/status_callback.h"
+#include "kudu/util/stopwatch.h"
 #include "kudu/util/trace.h"
 #include "kudu/util/trace_metrics.h"
 
@@ -222,6 +224,8 @@ extern const char* CFILE_CACHE_HIT_BYTES_METRIC_NAME;
 
 namespace tserver {
 
+const char* SCANNER_BYTES_READ_METRIC_NAME = "scanner_bytes_read";
+
 namespace {
 
 // Lookup the given tablet, only ensuring that it exists.
@@ -692,6 +696,13 @@ class ScanResultCollector {
   //
   // Does nothing by default.
   virtual void set_row_format_flags(uint64_t /* row_format_flags */) {}
+
+  CpuTimes* cpu_times() {
+    return &cpu_times_;
+  }
+
+ private:
+  CpuTimes cpu_times_;
 };
 
 namespace {
@@ -1606,11 +1617,26 @@ void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req,
 }
 
 namespace {
-void SetResourceMetrics(ResourceMetricsPB* metrics, rpc::RpcContext* context) {
+void SetResourceMetrics(const rpc::RpcContext* context,
+                        const CpuTimes* cpu_times,
+                        ResourceMetricsPB* metrics) {
   metrics->set_cfile_cache_miss_bytes(
     context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_MISS_BYTES_METRIC_NAME));
   metrics->set_cfile_cache_hit_bytes(
     context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_HIT_BYTES_METRIC_NAME));
+
+  metrics->set_bytes_read(
+    context->trace()->metrics()->GetMetric(SCANNER_BYTES_READ_METRIC_NAME));
+
+  rpc::InboundCallTiming timing;
+  timing.time_handled = context->GetTimeHandled();
+  timing.time_received = context->GetTimeReceived();
+  timing.time_completed = MonoTime::Now();
+
+  metrics->set_queue_duration_nanos(timing.QueueDuration().ToNanoseconds());
+  metrics->set_total_duration_nanos(timing.TotalDuration().ToNanoseconds());
+  metrics->set_cpu_system_nanos(cpu_times->system);
+  metrics->set_cpu_user_nanos(cpu_times->user);
 }
 } // anonymous namespace
 
@@ -1618,6 +1644,7 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
                              ScanResponsePB* resp,
                              rpc::RpcContext* context) {
   TRACE_EVENT0("tserver", "TabletServiceImpl::Scan");
+
   // Validate the request: user must pass a new_scan_request or
   // a scanner ID, but not both.
   if (PREDICT_FALSE(req->has_scanner_id() &&
@@ -1730,7 +1757,8 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
     resp->set_last_primary_key(last.ToString());
   }
   resp->set_propagated_timestamp(server_->clock()->Now().ToUint64());
-  SetResourceMetrics(resp->mutable_resource_metrics(), context);
+
+  SetResourceMetrics(context, collector.cpu_times(), resp->mutable_resource_metrics());
   context->RespondSuccess();
 }
 
@@ -2017,8 +2045,9 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
 
   resp->set_checksum(collector.agg_checksum());
   resp->set_has_more_results(has_more);
-  SetResourceMetrics(resp->mutable_resource_metrics(), context);
   resp->set_rows_checksummed(collector.rows_checksummed());
+
+  SetResourceMetrics(context, collector.cpu_times(), resp->mutable_resource_metrics());
   context->RespondSuccess();
 }
 
@@ -2264,7 +2293,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   // If we early-exit out of this function, automatically unregister
   // the scanner.
   ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id());
-  ScopedAddScannerTiming scanner_timer(scanner.get());
+  ScopedAddScannerTiming scanner_timer(scanner.get(), result_collector->cpu_times());
 
   // Create the user's requested projection.
   // TODO(todd): Add test cases for bad projections including 0 columns.
@@ -2549,7 +2578,7 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
 
   // If we early-exit out of this function, automatically unregister the scanner.
   ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id());
-  ScopedAddScannerTiming scanner_timer(scanner.get());
+  ScopedAddScannerTiming scanner_timer(scanner.get(), result_collector->cpu_times());
 
   VLOG(2) << "Found existing scanner " << scanner->id() << " for request: "
           << SecureShortDebugString(*req);
@@ -2661,6 +2690,7 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
 
   IteratorStats delta_stats = total_stats - scanner->already_reported_stats();
   scanner->set_already_reported_stats(total_stats);
+  TRACE_COUNTER_INCREMENT(SCANNER_BYTES_READ_METRIC_NAME, delta_stats.bytes_read);
 
   if (tablet) {
     tablet->metrics()->scanner_rows_scanned->IncrementBy(rows_scanned);
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 439fca0..bbf7f43 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -364,9 +364,22 @@ message ScanRequestPB {
 
 // RPC's resource metrics.
 message ResourceMetricsPB {
-  // all metrics MUST be the type of int64.
+  // All metrics MUST be the type of int64.
+  // Number of bytes that were read because of a block cache miss.
   optional int64 cfile_cache_miss_bytes = 1;
+  // Number of bytes that were read from the block cache because of a hit.
   optional int64 cfile_cache_hit_bytes = 2;
+  // Number of bytes read from disk (or cache) by the scanner.
+  optional int64 bytes_read = 3;
+  // Total time taken between scan rpc requests being accepted and when they were handled in
+  // nanoseconds for this scanner.
+  optional int64 queue_duration_nanos = 4;
+  // Total time taken for all scan rpc requests to complete in nanoseconds for this scanner.
+  optional int64 total_duration_nanos = 5;
+  // Total elapsed CPU user time in nanoseconds for all scan rpc requests for this scanner.
+  optional int64 cpu_user_nanos = 6;
+  // Total elapsed CPU system time in nanoseconds for all scan rpc requests for this scanner.
+  optional int64 cpu_system_nanos = 7;
 }
 
 message ScanResponsePB {


[kudu] 02/02: [examples] use org.apache.kudu packages v1.11.1

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c0508096153e313b616d0f26cac362e3a3196a5e
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Thu Nov 21 10:15:29 2019 -0800

    [examples] use org.apache.kudu packages v1.11.1
    
    Change-Id: Idfd5224b823c284d4680a1305834af80fc917828
    Reviewed-on: http://gerrit.cloudera.org:8080/14772
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 examples/java/collectl/README.adoc    | 2 +-
 examples/java/collectl/pom.xml        | 6 +++++-
 examples/java/insert-loadgen/pom.xml  | 6 +++++-
 examples/java/java-example/pom.xml    | 2 +-
 examples/quickstart/nifi/README.adoc  | 4 ++--
 examples/quickstart/spark/README.adoc | 2 +-
 examples/scala/spark-example/pom.xml  | 5 ++++-
 7 files changed, 19 insertions(+), 8 deletions(-)

diff --git a/examples/java/collectl/README.adoc b/examples/java/collectl/README.adoc
index 32a0375..72b317c 100644
--- a/examples/java/collectl/README.adoc
+++ b/examples/java/collectl/README.adoc
@@ -87,7 +87,7 @@ If you have Spark available, you can also look at the data in Kudu using Spark,
 
 [source,bash]
 ----
-$ spark2-shell --packages org.apache.kudu:kudu-spark2_2.11:1.7.0
+$ spark2-shell --packages org.apache.kudu:kudu-spark2_2.11:1.11.1
 ----
 
 You can then modify this example script to query the data with SparkSQL:
diff --git a/examples/java/collectl/pom.xml b/examples/java/collectl/pom.xml
index 9684e48..3fb7bc9 100644
--- a/examples/java/collectl/pom.xml
+++ b/examples/java/collectl/pom.xml
@@ -27,6 +27,10 @@
   <version>1.0-SNAPSHOT</version>
   <name>Kudu collectl Example</name>
 
+  <properties>
+    <kudu-version>1.11.1</kudu-version>
+  </properties>
+
   <build>
     <plugins>
       <plugin>
@@ -65,7 +69,7 @@
     <dependency>
       <groupId>org.apache.kudu</groupId>
       <artifactId>kudu-client</artifactId>
-      <version>1.7.0</version>
+      <version>${kudu-version}</version>
     </dependency>
 
     <!-- For logging messages. -->
diff --git a/examples/java/insert-loadgen/pom.xml b/examples/java/insert-loadgen/pom.xml
index b988465..ac9d643 100644
--- a/examples/java/insert-loadgen/pom.xml
+++ b/examples/java/insert-loadgen/pom.xml
@@ -27,6 +27,10 @@
   <version>1.0-SNAPSHOT</version>
   <name>Random Insert Load Generator for Kudu</name>
 
+  <properties>
+    <kudu-version>1.11.1</kudu-version>
+  </properties>
+
   <build>
     <plugins>
       <plugin>
@@ -65,7 +69,7 @@
     <dependency>
       <groupId>org.apache.kudu</groupId>
       <artifactId>kudu-client</artifactId>
-      <version>1.7.0</version>
+      <version>${kudu-version}</version>
     </dependency>
 
     <!-- For logging messages. -->
diff --git a/examples/java/java-example/pom.xml b/examples/java/java-example/pom.xml
index 04762dc..f7482bf 100644
--- a/examples/java/java-example/pom.xml
+++ b/examples/java/java-example/pom.xml
@@ -28,7 +28,7 @@
   <name>Kudu Java Client Examples</name>
 
   <properties>
-    <kudu-version>1.11.0</kudu-version>
+    <kudu-version>1.11.1</kudu-version>
   </properties>
 
   <build>
diff --git a/examples/quickstart/nifi/README.adoc b/examples/quickstart/nifi/README.adoc
index 260534b..0f7c79d 100644
--- a/examples/quickstart/nifi/README.adoc
+++ b/examples/quickstart/nifi/README.adoc
@@ -55,7 +55,7 @@ docker run -it --rm --network="docker_default" maven:latest bin/bash
 # Download the kudu-client-tools jar which has the kudu-client and all the dependencies.
 mkdir jars
 mvn dependency:copy \
-    -Dartifact=org.apache.kudu:kudu-client-tools:1.10.0 \
+    -Dartifact=org.apache.kudu:kudu-client-tools:1.11.1 \
     -DoutputDirectory=jars
 # Run the jshell with the jar on the classpath.
 jshell --class-path jars/*
@@ -163,7 +163,7 @@ example of the code to allow you to query the `random_user` table:
 
 [source,bash]
 ----
-spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.10.0
+spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.11.1
 ----
 
 [source,scala]
diff --git a/examples/quickstart/spark/README.adoc b/examples/quickstart/spark/README.adoc
index 1722778..e833838 100644
--- a/examples/quickstart/spark/README.adoc
+++ b/examples/quickstart/spark/README.adoc
@@ -69,7 +69,7 @@ Run the `spark-shell` with the `kudu-spark` package:
 
 [source,bash]
 ----
-spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.10.0
+spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.11.1
 ----
 
 NOTE: The examples below assume you are in the `spark-shell` with the
diff --git a/examples/scala/spark-example/pom.xml b/examples/scala/spark-example/pom.xml
index ed310a7..e62171a 100644
--- a/examples/scala/spark-example/pom.xml
+++ b/examples/scala/spark-example/pom.xml
@@ -27,6 +27,9 @@
   <version>1.0-SNAPSHOT</version>
   <name>Kudu Spark Examples</name>
 
+  <properties>
+    <kudu-version>1.11.1</kudu-version>
+  </properties>
 
   <build>
     <plugins>
@@ -93,7 +96,7 @@
     <dependency>
       <groupId>org.apache.kudu</groupId>
       <artifactId>kudu-spark2_2.11</artifactId>
-      <version>1.7.1</version>
+      <version>${kudu-version}</version>
     </dependency>
 
     <!-- The Spark dependencies are provided. -->