You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/06/07 19:30:33 UTC

hive git commit: HIVE-18690: Integrate with Spark OutputMetrics (Sahil Takiar, reviewed by Aihua Xu)

Repository: hive
Updated Branches:
  refs/heads/master 473dd0462 -> f04eba3ca


HIVE-18690: Integrate with Spark OutputMetrics (Sahil Takiar, reviewed by Aihua Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f04eba3c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f04eba3c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f04eba3c

Branch: refs/heads/master
Commit: f04eba3cac3007bfe61ec2f57c92d404f4d40b4c
Parents: 473dd04
Author: Sahil Takiar <ta...@gmail.com>
Authored: Tue Feb 27 20:21:55 2018 -0800
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Thu Jun 7 14:29:59 2018 -0500

----------------------------------------------------------------------
 .../hive/ql/exec/spark/TestSparkStatistics.java |  2 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   | 16 ++++-
 .../hive/ql/exec/spark/SparkMetricUtils.java    | 63 ++++++++++++++++++++
 .../spark/Statistic/SparkStatisticsNames.java   |  3 +
 .../spark/status/impl/SparkMetricsUtils.java    |  4 ++
 .../hive/spark/client/MetricsCollection.java    | 20 ++++++-
 .../hive/spark/client/metrics/Metrics.java      | 16 ++++-
 .../spark/client/metrics/OutputMetrics.java     | 57 ++++++++++++++++++
 .../spark/client/TestMetricsCollection.java     | 14 +++--
 9 files changed, 183 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
index f6c5b17..d383873 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
@@ -81,7 +81,7 @@ public class TestSparkStatistics {
       List<SparkStatistic> sparkStats = Lists.newArrayList(sparkTask.getSparkStatistics()
               .getStatisticGroup(SparkStatisticsNames.SPARK_GROUP_NAME).getStatistics());
 
-      Assert.assertEquals(24, sparkStats.size());
+      Assert.assertEquals(26, sparkStats.size());
 
       Map<String, String> statsMap = sparkStats.stream().collect(
               Collectors.toMap(SparkStatistic::getName, SparkStatistic::getValue));

http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 83b53f4..c2319bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.function.BiFunction;
 
 import com.google.common.collect.Lists;
 
@@ -37,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities.MissingBucketsContext;
+import org.apache.hadoop.hive.ql.exec.spark.SparkMetricUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -79,16 +82,18 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspecto
 import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
 import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
 import org.apache.hadoop.hive.shims.ShimLoader;
+
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
+
 import org.apache.hive.common.util.HiveStringUtils;
-import org.apache.hive.common.util.Murmur3;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.function.BiFunction;
+
 /**
  * File Sink operator implementation.
  **/
@@ -1228,6 +1233,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     row_count.set(numRows);
     LOG.info(toString() + ": records written - " + numRows);
 
+    if ("spark".equalsIgnoreCase(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) {
+      SparkMetricUtils.updateSparkRecordsWrittenMetrics(runTimeNumRows);
+    }
+
     if (!bDynParts && !filesCreated) {
       boolean skipFiles = "tez".equalsIgnoreCase(
           HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE));
@@ -1303,6 +1312,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         if (isNativeTable()) {
           fsp.commit(fs, commitPaths);
         }
+        if ("spark".equals(HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE))) {
+          SparkMetricUtils.updateSparkBytesWrittenMetrics(LOG, fs, fsp.finalPaths);
+        }
       }
       if (conf.isMmTable()) {
         Utilities.writeMmCommitManifest(commitPaths, specPath, fs, taskId,

http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java
new file mode 100644
index 0000000..1f856ae
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.TaskContext;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Utility class for update Spark-level metrics.
+ */
+public final class SparkMetricUtils {
+
+  private SparkMetricUtils() {
+    // Do nothing
+  }
+
+  public static void updateSparkRecordsWrittenMetrics(long numRows) {
+    TaskContext taskContext = TaskContext.get();
+    if (taskContext != null && numRows > 0) {
+      taskContext.taskMetrics().outputMetrics().setRecordsWritten(numRows);
+    }
+  }
+
+  public static void updateSparkBytesWrittenMetrics(Logger log, FileSystem fs, Path[]
+          commitPaths) {
+    AtomicLong bytesWritten = new AtomicLong();
+    Arrays.stream(commitPaths).parallel().forEach(path -> {
+      try {
+        bytesWritten.addAndGet(fs.getFileStatus(path).getLen());
+      } catch (IOException e) {
+        log.debug("Unable to collect stats for file: " + path + " output metrics may be inaccurate",
+                e);
+      }
+    });
+    if (bytesWritten.get() > 0) {
+      TaskContext.get().taskMetrics().outputMetrics().setBytesWritten(bytesWritten.get());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
index 12c3eac..b80a8f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
@@ -54,5 +54,8 @@ public class SparkStatisticsNames {
   public static final String SHUFFLE_WRITE_TIME = "ShuffleWriteTime";
   public static final String SHUFFLE_RECORDS_WRITTEN = "ShuffleRecordsWritten";
 
+  public static final String RECORDS_WRITTEN = "RecordsWritten";
+  public static final String BYTES_WRITTEN = "BytesWritten";
+
   public static final String SPARK_GROUP_NAME = "SPARK";
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
index c73c150..a0a0330 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
@@ -68,6 +68,10 @@ public final class SparkMetricsUtils {
               allMetrics.shuffleWriteMetrics.shuffleRecordsWritten);
       results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime);
     }
+    if (allMetrics.outputMetrics != null) {
+      results.put(SparkStatisticsNames.BYTES_WRITTEN, allMetrics.outputMetrics.bytesWritten);
+      results.put(SparkStatisticsNames.RECORDS_WRITTEN, allMetrics.outputMetrics.recordsWritten);
+    }
     return results;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
index a0db015..2be19de 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hive.spark.client.metrics.InputMetrics;
 import org.apache.hive.spark.client.metrics.Metrics;
+import org.apache.hive.spark.client.metrics.OutputMetrics;
 import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
 import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics;
 
@@ -171,6 +172,11 @@ public class MetricsCollection {
       long shuffleWriteTime = 0L;
       long shuffleRecordsWritten = 0L;
 
+      // Input metrics.
+      boolean hasOuputMetrics = false;
+      long bytesWritten = 0L;
+      long recordsWritten = 0L;
+
       for (TaskInfo info : Collections2.filter(taskMetrics, filter)) {
         Metrics m = info.metrics;
         executorDeserializeTime += m.executorDeserializeTime;
@@ -206,6 +212,12 @@ public class MetricsCollection {
           shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime;
           shuffleRecordsWritten += m.shuffleWriteMetrics.shuffleRecordsWritten;
         }
+
+        if (m.outputMetrics != null) {
+          hasOuputMetrics = true;
+          bytesWritten += m.outputMetrics.bytesWritten;
+          recordsWritten += m.outputMetrics.recordsWritten;
+        }
       }
 
       InputMetrics inputMetrics = null;
@@ -233,6 +245,11 @@ public class MetricsCollection {
           shuffleRecordsWritten);
       }
 
+      OutputMetrics outputMetrics = null;
+      if (hasInputMetrics) {
+        outputMetrics = new OutputMetrics(bytesWritten, recordsWritten);
+      }
+
       return new Metrics(
         executorDeserializeTime,
         executorDeserializeCpuTime,
@@ -246,7 +263,8 @@ public class MetricsCollection {
         taskDurationTime,
         inputMetrics,
         shuffleReadMetrics,
-        shuffleWriteMetrics);
+        shuffleWriteMetrics,
+        outputMetrics);
     } finally {
         lock.readLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
index cf7a1f6..e09effc 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
@@ -63,10 +63,12 @@ public class Metrics implements Serializable {
   public final ShuffleReadMetrics shuffleReadMetrics;
   /** If tasks wrote to shuffle output, metrics on the written shuffle data. */
   public final ShuffleWriteMetrics shuffleWriteMetrics;
+  /** A collection of accumulators that represents metrics about writing data to external systems. */
+  public final OutputMetrics outputMetrics;
 
   private Metrics() {
     // For Serialization only.
-    this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null);
+    this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null, null);
   }
 
   public Metrics(
@@ -82,7 +84,8 @@ public class Metrics implements Serializable {
       long taskDurationTime,
       InputMetrics inputMetrics,
       ShuffleReadMetrics shuffleReadMetrics,
-      ShuffleWriteMetrics shuffleWriteMetrics) {
+      ShuffleWriteMetrics shuffleWriteMetrics,
+      OutputMetrics outputMetrics) {
     this.executorDeserializeTime = executorDeserializeTime;
     this.executorDeserializeCpuTime = executorDeserializeCpuTime;
     this.executorRunTime = executorRunTime;
@@ -96,6 +99,7 @@ public class Metrics implements Serializable {
     this.inputMetrics = inputMetrics;
     this.shuffleReadMetrics = shuffleReadMetrics;
     this.shuffleWriteMetrics = shuffleWriteMetrics;
+    this.outputMetrics = outputMetrics;
   }
 
   public Metrics(TaskMetrics metrics, TaskInfo taskInfo) {
@@ -112,7 +116,8 @@ public class Metrics implements Serializable {
       taskInfo.duration(),
       optionalInputMetric(metrics),
       optionalShuffleReadMetric(metrics),
-      optionalShuffleWriteMetrics(metrics));
+      optionalShuffleWriteMetrics(metrics),
+      optionalOutputMetrics(metrics));
   }
 
   private static InputMetrics optionalInputMetric(TaskMetrics metrics) {
@@ -127,6 +132,10 @@ public class Metrics implements Serializable {
     return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null;
   }
 
+  private static OutputMetrics optionalOutputMetrics(TaskMetrics metrics) {
+    return (metrics.outputMetrics() != null) ? new OutputMetrics(metrics) : null;
+  }
+
   @Override
   public String toString() {
     return "Metrics{" +
@@ -143,6 +152,7 @@ public class Metrics implements Serializable {
             ", inputMetrics=" + inputMetrics +
             ", shuffleReadMetrics=" + shuffleReadMetrics +
             ", shuffleWriteMetrics=" + shuffleWriteMetrics +
+            ", outputMetrics=" + outputMetrics +
             '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java
new file mode 100644
index 0000000..99516ff
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.metrics;
+
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.spark.executor.TaskMetrics;
+
+import java.io.Serializable;
+
+/**
+ * Metrics pertaining to writing data.
+ */
+@InterfaceAudience.Private
+public class OutputMetrics implements Serializable {
+
+  /** Total number of bytes written. */
+  public final long bytesWritten;
+  /** Total number of records written. */
+  public final long recordsWritten;
+
+  private OutputMetrics() {
+    // For Serialization only.
+    this(0L, 0L);
+  }
+
+  public OutputMetrics(long bytesWritten, long recordsWritten) {
+    this.bytesWritten = bytesWritten;
+    this.recordsWritten = recordsWritten;
+  }
+
+  public OutputMetrics(TaskMetrics metrics) {
+    this(metrics.outputMetrics().bytesWritten(), metrics.outputMetrics().recordsWritten());
+  }
+
+  @Override
+  public String toString() {
+    return "OutputMetrics{" +
+            "bytesWritten=" + bytesWritten +
+            ", recordsWritten=" + recordsWritten +
+            '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
index 2d4c43d..a5c13ca 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import org.apache.hive.spark.client.metrics.DataReadMethod;
 import org.apache.hive.spark.client.metrics.InputMetrics;
 import org.apache.hive.spark.client.metrics.Metrics;
+import org.apache.hive.spark.client.metrics.OutputMetrics;
 import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
 import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics;
 import org.junit.Test;
@@ -67,7 +68,7 @@ public class TestMetricsCollection {
   public void testOptionalMetrics() {
     long value = taskValue(1, 1, 1L);
     Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, value,
-            value, null, null, null);
+            value, null, null, null, null);
 
     MetricsCollection collection = new MetricsCollection();
     for (int i : Arrays.asList(1, 2)) {
@@ -96,9 +97,9 @@ public class TestMetricsCollection {
     long value = taskValue(1, 1, 1);
 
     Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, value,
-            value, new InputMetrics(value, value), null, null);
+            value, new InputMetrics(value, value), null, null, null);
     Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, value,
-            value, new InputMetrics(value, value), null, null);
+            value, new InputMetrics(value, value), null, null, null);
 
     collection.addMetrics(1, 1, 1, metrics1);
     collection.addMetrics(1, 1, 2, metrics2);
@@ -112,7 +113,8 @@ public class TestMetricsCollection {
     return new Metrics(value, value, value, value, value, value, value, value, value, value,
       new InputMetrics(value, value),
       new ShuffleReadMetrics((int) value, (int) value, value, value, value, value, value),
-      new ShuffleWriteMetrics(value, value, value));
+      new ShuffleWriteMetrics(value, value, value),
+      new OutputMetrics(value, value));
   }
 
   /**
@@ -173,6 +175,8 @@ public class TestMetricsCollection {
     assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten);
     assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime);
     assertEquals(expected, metrics.shuffleWriteMetrics.shuffleRecordsWritten);
-  }
 
+    assertEquals(expected, metrics.outputMetrics.recordsWritten);
+    assertEquals(expected, metrics.outputMetrics.bytesWritten);
+  }
 }