You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/09/20 21:26:43 UTC

[1/2] kudu git commit: parse_metrics_log: merge metrics across entities of same type

Repository: kudu
Updated Branches:
  refs/heads/master 3e6e12a9f -> 5a0015e75


parse_metrics_log: merge metrics across entities of same type

This changes the tool so that all of the tablet metrics get summed
together before reporting. Previously we just reported the info from an
arbitrary tablet (probably the last one to be listed in the log).

This also adds a short circuit such that we skip parsing and merging of
metrics which have not been slated for output. This sped up processing
a metrics file ~10x when reporting only one or two metrics from it.

Lastly, this adds reporting of the max value from histograms.

Change-Id: I442cddd4c5352dcfae30fa23888082d916069d5f
Reviewed-on: http://gerrit.cloudera.org:8080/8062
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3ab3f450
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3ab3f450
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3ab3f450

Branch: refs/heads/master
Commit: 3ab3f45094c32d903ea179be959e4c862b05f668
Parents: 3e6e12a
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Sep 13 17:33:43 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Sep 20 20:23:20 2017 +0000

----------------------------------------------------------------------
 src/kudu/scripts/parse_metrics_log.py | 111 +++++++++++++++++++++++------
 1 file changed, 91 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3ab3f450/src/kudu/scripts/parse_metrics_log.py
----------------------------------------------------------------------
diff --git a/src/kudu/scripts/parse_metrics_log.py b/src/kudu/scripts/parse_metrics_log.py
index 30c1157..3b58991 100644
--- a/src/kudu/scripts/parse_metrics_log.py
+++ b/src/kudu/scripts/parse_metrics_log.py
@@ -23,31 +23,47 @@ and outputs a TSV file including some metrics.
 This isn't meant to be used standalone as written, but rather as a template
 which is edited based on whatever metrics you'd like to extract. The set
 of metrics described below are just a starting point to work from.
+Uncomment the ones you are interested in, or add new ones.
 """
 
+from collections import Counter
 import gzip
+import heapq
+import itertools
 try:
   import simplejson as json
 except:
   import json
 import sys
 
+# Even if the input data has very frequent metrics, for graphing purposes we
+# may not want to look at such fine-grained data. This constant can be set
+# to drop samples which were measured within a given number of seconds
+# from the prior sample.
+GRANULARITY_SECS = 30
+
 # These metrics will be extracted "as-is" into the TSV.
 # The first element of each tuple is the metric name.
 # The second is the name that will be used in the TSV header line.
 SIMPLE_METRICS = [
-  ("server.generic_current_allocated_bytes", "heap_allocated"),
-  ("server.log_block_manager_bytes_under_management", "bytes_on_disk"),
-  ("tablet.memrowset_size", "mrs_size"),
-  ("server.block_cache_usage", "bc_usage"),
+  #  ("server.generic_current_allocated_bytes", "heap_allocated"),
+  #  ("server.log_block_manager_bytes_under_management", "bytes_on_disk"),
+  #  ("tablet.memrowset_size", "mrs_size"),
+  #  ("server.block_cache_usage", "bc_usage"),
 ]
 
 # These metrics will be extracted as per-second rates into the TSV.
 RATE_METRICS = [
-  ("server.block_manager_total_bytes_read", "bytes_r_per_sec"),
-  ("server.block_manager_total_bytes_written", "bytes_w_per_sec"),
-  ("server.block_cache_lookups", "bc_lookups_per_sec"),
-  ("tablet.rows_inserted", "inserts_per_sec"),
+  #  ("server.block_manager_total_bytes_read", "bytes_r_per_sec"),
+  #  ("server.block_manager_total_bytes_written", "bytes_w_per_sec"),
+  #  ("server.block_cache_lookups", "bc_lookups_per_sec"),
+  #  ("server.cpu_utime", "cpu_utime"),
+  #  ("server.cpu_stime", "cpu_stime"),
+  #  ("server.involuntary_context_switches", "invol_cs"),
+  #  ("server.voluntary_context_switches", "vol_cs"),
+  #  ("tablet.rows_inserted", "inserts_per_sec"),
+  #  ("tablet.rows_upserted", "upserts_per_sec"),
+  #  ("server.tcmalloc_contention_time", "tcmalloc_contention_time")
 ]
 
 # These metrics will be extracted as percentile metrics into the TSV.
@@ -55,12 +71,55 @@ RATE_METRICS = [
 # percentile numbers suffixed to the column name provided here (foo_p95,
 # foo_p99, etc)
 HISTOGRAM_METRICS = [
-  ("server.handler_latency_kudu_tserver_TabletServerService_Write", "write"),
-  ("tablet.log_append_latency", "log")
+  #  ("server.op_apply_run_time", "apply_run_time"),
+  #  ("server.handler_latency_kudu_tserver_TabletServerService_Write", "write"),
+  #  ("server.handler_latency_kudu_consensus_ConsensusService_UpdateConsensus", "cons_update"),
+  #  ("server.handler_latency_kudu_consensus_ConsensusService_RequestVote", "vote"),
+  #  ("server.handler_latency_kudu_tserver_TabletCopyService_FetchData", "fetch_data"),
+  #  ("tablet.bloom_lookups_per_op", "bloom_lookups"),
+  #  ("tablet.log_append_latency", "log"),
+  #  ("tablet.op_prepare_run_time", "prep"),
+  #  ("tablet.write_op_duration_client_propagated_consistency", "op_dur")
 ]
 
+# Get the set of metrics we actuall want to bother parsing from the log.
+PARSE_METRIC_KEYS = set(key for (key, _) in (SIMPLE_METRICS + RATE_METRICS + HISTOGRAM_METRICS))
+
+# The script always reports cache-hit metrics.
+PARSE_METRIC_KEYS.add("server.block_cache_hits_caching")
+PARSE_METRIC_KEYS.add("server.block_cache_misses_caching")
+
 NaN = float('nan')
-UNKNOWN_PERCENTILES = dict(p50=NaN, p95=NaN, p99=NaN, p999=NaN)
+UNKNOWN_PERCENTILES = dict(p50=NaN, p95=NaN, p99=NaN, p999=NaN, max=NaN)
+
+def merge_delta(m, delta):
+  """
+  Update (in-place) the metrics entry 'm' by merging another entry 'delta'.
+
+  Counts and sums are simply added.
+  Histograms require more complex processing: the 'values' array needs to be
+  merged and the then the delta's counts added to the corresponding buckets.
+  """
+
+  for k, v in delta.iteritems():
+    if k in ('name', 'values', 'counts', 'min', 'max', 'mean'):
+      continue
+    m[k] += v
+
+  # Merge counts.
+  if 'counts' in delta:
+    m_zip = itertools.izip(m.get('values', []), m.get('counts', []))
+    d_zip = itertools.izip(delta.get('values', []), delta.get('counts', []))
+    new_values = []
+    new_counts = []
+    i = 0
+    for value, counts in itertools.groupby(heapq.merge(m_zip, d_zip), lambda x: x[0]):
+      new_values[i] = value
+      new_counts[i] = sum(c for v, c in counts)
+      i += 1
+    m['counts'] = new_counts
+    m['values'] = new_values
+
 
 def json_to_map(j):
   """
@@ -68,15 +127,19 @@ def json_to_map(j):
   keyed by <entity>.<metric name>.
 
   The entity ID is currently ignored. If there is more than one
-  entity of a given type (eg tables), it is undefined which one
-  will be reflected in the output metrics.
-
-  TODO: add some way to specify a particular tablet to parse out.
+  entity of a given type (eg tablets), the metrics will be summed
+  together using 'merge_delta' above.
   """
   ret = {}
   for entity in j:
     for m in entity['metrics']:
-      ret[entity['type'] + "." + m['name']] = m
+      key = entity['type'] + "." + m['name']
+      if key not in PARSE_METRIC_KEYS:
+        continue
+      if key in ret:
+        merge_delta(ret[key], m)
+      else:
+        ret[key] = m
   return ret
 
 def delta(prev, cur, m):
@@ -98,7 +161,7 @@ def histogram_stats(prev, cur, m):
   p_dict = dict(zip(prev.get('values', []),
                     prev.get('counts', [])))
   c_zip = zip(cur.get('values', []),
-                    cur.get('counts', []))
+              cur.get('counts', []))
   delta_total = cur['total_count'] - prev['total_count']
   if delta_total == 0:
     return UNKNOWN_PERCENTILES
@@ -117,6 +180,8 @@ def histogram_stats(prev, cur, m):
       res['p99'] = cur_val
     if 'p999' not in res and percentile > 0.999:
       res['p999'] = cur_val
+    if cum_count == delta_total and delta_count != 0:
+      res['max'] = cur_val
   return res
 
 def cache_hit_ratio(prev, cur):
@@ -145,7 +210,7 @@ def process(prev, cur):
   calc_vals.extend(delta(prev, cur, metric)/delta_ts for (metric, _) in RATE_METRICS)
   for metric, _ in HISTOGRAM_METRICS:
     stats = histogram_stats(prev, cur, metric)
-    calc_vals.extend([stats['p50'], stats['p95'], stats['p99'], stats['p999']])
+    calc_vals.extend([stats['p50'], stats['p95'], stats['p99'], stats['p999'], stats['max']])
 
   print (cur['ts'] + prev['ts'])/2, \
         cache_ratio, \
@@ -160,6 +225,7 @@ def main(argv):
     simple_headers.append(header + "_p95")
     simple_headers.append(header + "_p99")
     simple_headers.append(header + "_p999")
+    simple_headers.append(header + "_max")
 
   print "time cache_hit_ratio", " ".join(simple_headers)
 
@@ -168,10 +234,15 @@ def main(argv):
       f = gzip.GzipFile(path)
     else:
       f = file(path)
-    for line in f:
+    for line_number, line in enumerate(f, start=1):
       (_, ts, metrics_json) = line.split(" ", 2)
       ts = float(ts) / 1000000.0
-      if prev_data and ts < prev_data['ts'] + 30:
+      prev_ts = prev_data['ts'] if prev_data else 0
+      # Enforce that the samples come in time-sorted order.
+      if ts <= prev_ts:
+        raise Exception("timestamps must be in ascending order (%f <= %f at %s:%d)"
+                        % (ts, prev_ts, path, line_number))
+      if prev_data and ts < prev_ts + GRANULARITY_SECS:
         continue
       data = json_to_map(json.loads(metrics_json))
       data['ts'] = ts


[2/2] kudu git commit: kudu-spark: change default master address to local fqdn

Posted by da...@apache.org.
kudu-spark: change default master address to local fqdn

The previous default was 'localhost', which isn't resolvable in a
cluster context. It would also cause issues on Kerberized clusters due
to 'localhost' not matching the Kerberos principal name of the master
(see KUDU-2142).

Change-Id: I10ec7414c451f54b95d86663d743162688e304ba
Reviewed-on: http://gerrit.cloudera.org:8080/8072
Reviewed-by: Attila Bukor <ab...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5a0015e7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5a0015e7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5a0015e7

Branch: refs/heads/master
Commit: 5a0015e75c6bde8ddb6f113083c3e7d46e48c5de
Parents: 3ab3f45
Author: Dan Burkert <da...@apache.org>
Authored: Thu Sep 14 11:43:38 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Wed Sep 20 21:25:57 2017 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/tools/ImportExportFiles.scala | 10 +++++++---
 .../spark/tools/IntegrationTestBigLinkedList.scala  | 16 ++++++++++------
 .../org/apache/kudu/spark/kudu/DefaultSource.scala  | 10 ++++++----
 3 files changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5a0015e7/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
index 025b06b..a09b52d 100644
--- a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
@@ -17,9 +17,11 @@
 
 package org.apache.kudu.spark.tools
 
+import java.net.InetAddress
+
 import org.apache.kudu.client.KuduClient
 import org.apache.kudu.spark.tools.ImportExportKudu.ArgsCls
-import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.SparkConf
 import org.slf4j.{Logger, LoggerFactory}
 import org.apache.kudu.spark.kudu._
@@ -33,13 +35,15 @@ object ImportExportKudu {
     sys.exit(1)
   }
 
+  def defaultMasterAddrs: String = InetAddress.getLocalHost.getCanonicalHostName
+
   def usage: String =
     s"""
        | Usage: --operation=import/export --format=<data-format(csv,parquet,avro)> --master-addrs=<master-addrs> --path=<path> --table-name=<table-name>
        |    where
        |      operation: import or export data from or to Kudu tables, default: import
        |      format: specify the format of data want to import/export, the following formats are supported csv,parquet,avro default:csv
-       |      masterAddrs: comma separated addresses of Kudu master nodes, default: localhost
+       |      masterAddrs: comma separated addresses of Kudu master nodes, default: $defaultMasterAddrs
        |      path: path to input or output for import/export operation, default: file://
        |      tableName: table name to import/export, default: ""
        |      columns: columns name for select statement on export from kudu table, default: *
@@ -49,7 +53,7 @@ object ImportExportKudu {
 
   case class ArgsCls(operation: String = "import",
                      format: String = "csv",
-                     masterAddrs: String = "localhost",
+                     masterAddrs: String = defaultMasterAddrs,
                      path: String = "file://",
                      tableName: String = "",
                      columns: String = "*",

http://git-wip-us.apache.org/repos/asf/kudu/blob/5a0015e7/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
index db03c24..7b1fc31 100644
--- a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
@@ -17,6 +17,8 @@
 
 package org.apache.kudu.spark.tools
 
+import java.net.InetAddress
+
 import com.google.common.annotations.VisibleForTesting
 import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.kudu.client.{KuduClient, KuduSession, KuduTable}
@@ -81,6 +83,8 @@ object IntegrationTestBigLinkedList {
     else s"${n}ns"
   }
 
+  def defaultMasterAddrs: String = InetAddress.getLocalHost.getCanonicalHostName
+
   def main(args: Array[String]): Unit = {
     if (args.isEmpty) { fail(usage) }
 
@@ -94,7 +98,7 @@ object IntegrationTestBigLinkedList {
 }
 
 object Generator {
-  import IntegrationTestBigLinkedList.{LOG, fail, nanosToHuman, parseIntFlag}
+  import IntegrationTestBigLinkedList.{LOG, defaultMasterAddrs, fail, nanosToHuman, parseIntFlag}
 
   def usage: String =
     s"""
@@ -108,7 +112,7 @@ object Generator {
        |      hashPartitions: number of hash partitions to create for the new linked list table, if it doesn't exist, default: 1
        |      rangePartitions: number of range partitions to create for the new linked list table, if it doesn't exist, default: 1
        |      replicas: number of replicas to create for the new linked list table, if it doesn't exist, default: 1
-       |      master-addrs: comma separated addresses of Kudu master nodes, default: localhost
+       |      master-addrs: comma separated addresses of Kudu master nodes, default: $defaultMasterAddrs
        |      table-name: the name of the linked list table, default: $DEFAULT_TABLE_NAME
      """.stripMargin
 
@@ -118,7 +122,7 @@ object Generator {
                   hashPartitions: Int = 1,
                   rangePartitions: Int = 1,
                   replicas: Int = 1,
-                  masterAddrs: String = "localhost",
+                  masterAddrs: String = defaultMasterAddrs,
                   tableName: String = DEFAULT_TABLE_NAME)
 
   object Args {
@@ -267,19 +271,19 @@ object Generator {
 }
 
 object Verifier {
-  import IntegrationTestBigLinkedList.{fail, parseLongFlag}
+  import IntegrationTestBigLinkedList.{defaultMasterAddrs, fail, parseLongFlag}
 
   def usage: String =
     s"""
        | Usage: verify --nodes=<nodes> --master-addrs=<master-addrs> --table-name=<table-name>
        |    where
        |      nodes: number of nodes expected to be in the linked list table
-       |      master-addrs: comma separated addresses of Kudu master nodes, default: localhost
+       |      master-addrs: comma separated addresses of Kudu master nodes, default: $defaultMasterAddrs
        |      table-name: the name of the linked list table, default: $DEFAULT_TABLE_NAME
      """.stripMargin
 
   case class Args(nodes: Option[Long] = None,
-                  masterAddrs: String = "localhost",
+                  masterAddrs: String = defaultMasterAddrs,
                   tableName: String = DEFAULT_TABLE_NAME)
 
   object Args {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5a0015e7/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 5c91a1f..fbc2d38 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.spark.kudu
 
+import java.net.InetAddress
 import java.sql.Timestamp
 
 import scala.collection.JavaConverters._
@@ -49,6 +50,8 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
   val OPERATION = "kudu.operation"
   val FAULT_TOLERANT_SCANNER = "kudu.faultTolerantScan"
 
+  def defaultMasterAddrs: String = InetAddress.getLocalHost.getCanonicalHostName
+
   /**
     * Construct a BaseRelation using the provided context and parameters.
     *
@@ -62,13 +65,12 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
     val tableName = parameters.getOrElse(TABLE_KEY,
       throw new IllegalArgumentException(
         s"Kudu table name must be specified in create options using key '$TABLE_KEY'"))
-    val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
+    val kuduMaster = parameters.getOrElse(KUDU_MASTER, defaultMasterAddrs)
     val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert"))
     val faultTolerantScanner = Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean)
       .getOrElse(false)
 
-    new KuduRelation(tableName, kuduMaster, faultTolerantScanner, operationType,
-      None)(sqlContext)
+    new KuduRelation(tableName, kuduMaster, faultTolerantScanner, operationType, None)(sqlContext)
   }
 
   /**
@@ -97,7 +99,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider
     val tableName = parameters.getOrElse(TABLE_KEY,
       throw new IllegalArgumentException(s"Kudu table name must be specified in create options " +
         s"using key '$TABLE_KEY'"))
-    val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
+    val kuduMaster = parameters.getOrElse(KUDU_MASTER, defaultMasterAddrs)
     val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert"))
     val faultTolerantScanner = Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean)
       .getOrElse(false)