You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/11/19 22:43:02 UTC

svn commit: r1640633 - in /hive/branches/spark/spark-client/src: main/java/org/apache/hive/spark/client/MetricsCollection.java main/java/org/apache/hive/spark/client/metrics/Metrics.java test/java/org/apache/hive/spark/client/TestMetricsCollection.java

Author: szehon
Date: Wed Nov 19 21:43:02 2014
New Revision: 1640633

URL: http://svn.apache.org/r1640633
Log:
HIVE-8854 : Guava dependency conflict between hive driver and remote spark context[Spark Branch] (Marcelo Vanzin via Szehon)

Modified:
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java?rev=1640633&r1=1640632&r2=1640633&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java Wed Nov 19 21:43:02 2014
@@ -26,7 +26,6 @@ import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Function;
-import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Collections2;
@@ -178,53 +177,50 @@ public class MetricsCollection {
         memoryBytesSpilled += m.memoryBytesSpilled;
         diskBytesSpilled += m.diskBytesSpilled;
 
-        if (m.inputMetrics.isPresent()) {
+        if (m.inputMetrics != null) {
           hasInputMetrics = true;
-          InputMetrics im = m.inputMetrics.get();
           if (readMethod == null) {
-            readMethod = im.readMethod;
-          } else if (readMethod != im.readMethod) {
+            readMethod = m.inputMetrics.readMethod;
+          } else if (readMethod != m.inputMetrics.readMethod) {
             readMethod = DataReadMethod.Multiple;
           }
-          bytesRead += im.bytesRead;
+          bytesRead += m.inputMetrics.bytesRead;
         }
 
-        if (m.shuffleReadMetrics.isPresent()) {
-          ShuffleReadMetrics srm = m.shuffleReadMetrics.get();
+        if (m.shuffleReadMetrics != null) {
           hasShuffleReadMetrics = true;
-          remoteBlocksFetched += srm.remoteBlocksFetched;
-          localBlocksFetched += srm.localBlocksFetched;
-          fetchWaitTime += srm.fetchWaitTime;
-          remoteBytesRead += srm.remoteBytesRead;
+          remoteBlocksFetched += m.shuffleReadMetrics.remoteBlocksFetched;
+          localBlocksFetched += m.shuffleReadMetrics.localBlocksFetched;
+          fetchWaitTime += m.shuffleReadMetrics.fetchWaitTime;
+          remoteBytesRead += m.shuffleReadMetrics.remoteBytesRead;
         }
 
-        if (m.shuffleWriteMetrics.isPresent()) {
-          ShuffleWriteMetrics swm = m.shuffleWriteMetrics.get();
+        if (m.shuffleWriteMetrics != null) {
           hasShuffleWriteMetrics = true;
-          shuffleBytesWritten += swm.shuffleBytesWritten;
-          shuffleWriteTime += swm.shuffleWriteTime;
+          shuffleBytesWritten += m.shuffleWriteMetrics.shuffleBytesWritten;
+          shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime;
         }
       }
 
-      Optional<InputMetrics> inputMetrics = Optional.absent();
+      InputMetrics inputMetrics = null;
       if (hasInputMetrics) {
-        inputMetrics = Optional.of(new InputMetrics(readMethod, bytesRead));
+        inputMetrics = new InputMetrics(readMethod, bytesRead);
       }
 
-      Optional<ShuffleReadMetrics> shuffleReadMetrics = Optional.absent();
+      ShuffleReadMetrics shuffleReadMetrics = null;
       if (hasShuffleReadMetrics) {
-        shuffleReadMetrics = Optional.of(new ShuffleReadMetrics(
+        shuffleReadMetrics = new ShuffleReadMetrics(
           remoteBlocksFetched,
           localBlocksFetched,
           fetchWaitTime,
-          remoteBytesRead));
+          remoteBytesRead);
       }
 
-      Optional<ShuffleWriteMetrics> shuffleWriteMetrics = Optional.absent();
+      ShuffleWriteMetrics shuffleWriteMetrics = null;
       if (hasShuffleReadMetrics) {
-        shuffleWriteMetrics = Optional.of(new ShuffleWriteMetrics(
+        shuffleWriteMetrics = new ShuffleWriteMetrics(
           shuffleBytesWritten,
-          shuffleWriteTime));
+          shuffleWriteTime);
       }
 
       return new Metrics(

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java?rev=1640633&r1=1640632&r2=1640633&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java Wed Nov 19 21:43:02 2014
@@ -19,7 +19,6 @@ package org.apache.hive.spark.client.met
 
 import java.io.Serializable;
 
-import com.google.common.base.Optional;
 import org.apache.spark.executor.TaskMetrics;
 
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
@@ -48,14 +47,14 @@ public class Metrics implements Serializ
   /** The number of on-disk bytes spilled by tasks. */
   public final long diskBytesSpilled;
   /** If tasks read from a HadoopRDD or from persisted data, metrics on how much data was read. */
-  public final Optional<InputMetrics> inputMetrics;
+  public final InputMetrics inputMetrics;
   /**
    * If tasks read from shuffle output, metrics on getting shuffle data. This includes read metrics
    * aggregated over all the tasks' shuffle dependencies.
    */
-  public final Optional<ShuffleReadMetrics> shuffleReadMetrics;
+  public final ShuffleReadMetrics shuffleReadMetrics;
   /** If tasks wrote to shuffle output, metrics on the written shuffle data. */
-  public final Optional<ShuffleWriteMetrics> shuffleWriteMetrics;
+  public final ShuffleWriteMetrics shuffleWriteMetrics;
 
   public Metrics(
       long executorDeserializeTime,
@@ -65,9 +64,9 @@ public class Metrics implements Serializ
       long resultSerializationTime,
       long memoryBytesSpilled,
       long diskBytesSpilled,
-      Optional<InputMetrics> inputMetrics,
-      Optional<ShuffleReadMetrics> shuffleReadMetrics,
-      Optional<ShuffleWriteMetrics> shuffleWriteMetrics) {
+      InputMetrics inputMetrics,
+      ShuffleReadMetrics shuffleReadMetrics,
+      ShuffleWriteMetrics shuffleWriteMetrics) {
     this.executorDeserializeTime = executorDeserializeTime;
     this.executorRunTime = executorRunTime;
     this.resultSize = resultSize;
@@ -94,24 +93,16 @@ public class Metrics implements Serializ
       optionalShuffleWriteMetrics(metrics));
   }
 
-  private static final Optional<InputMetrics> optionalInputMetric(TaskMetrics metrics) {
-    return metrics.inputMetrics().isDefined()
-      ? Optional.of(new InputMetrics(metrics))
-      : Optional.<InputMetrics>absent();
+  private static final InputMetrics optionalInputMetric(TaskMetrics metrics) {
+    return metrics.inputMetrics().isDefined() ? new InputMetrics(metrics) : null;
   }
 
-  private static final Optional<ShuffleReadMetrics>
-      optionalShuffleReadMetric(TaskMetrics metrics) {
-    return metrics.shuffleReadMetrics().isDefined()
-      ? Optional.of(new ShuffleReadMetrics(metrics))
-      : Optional.<ShuffleReadMetrics>absent();
+  private static final ShuffleReadMetrics optionalShuffleReadMetric(TaskMetrics metrics) {
+    return metrics.shuffleReadMetrics().isDefined() ? new ShuffleReadMetrics(metrics) : null;
   }
 
-  private static final Optional<ShuffleWriteMetrics>
-      optionalShuffleWriteMetrics(TaskMetrics metrics) {
-    return metrics.shuffleWriteMetrics().isDefined()
-      ? Optional.of(new ShuffleWriteMetrics(metrics))
-      : Optional.<ShuffleWriteMetrics>absent();
+  private static final ShuffleWriteMetrics optionalShuffleWriteMetrics(TaskMetrics metrics) {
+    return metrics.shuffleWriteMetrics().isDefined() ? new ShuffleWriteMetrics(metrics) : null;
   }
 
 }

Modified: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java?rev=1640633&r1=1640632&r2=1640633&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java (original)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java Wed Nov 19 21:43:02 2014
@@ -19,7 +19,6 @@ package org.apache.hive.spark.client;
 
 import java.util.Arrays;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.junit.Test;
@@ -62,9 +61,7 @@ public class TestMetricsCollection {
   public void testOptionalMetrics() {
     long value = taskValue(1, 1, 1L);
     Metrics metrics = new Metrics(value, value, value, value, value, value, value,
-        Optional.<InputMetrics>absent(),
-        Optional.<ShuffleReadMetrics>absent(),
-        Optional.<ShuffleWriteMetrics>absent());
+        null, null, null);
 
     MetricsCollection collection = new MetricsCollection();
     for (int i : Arrays.asList(1, 2)) {
@@ -72,18 +69,18 @@ public class TestMetricsCollection {
     }
 
     Metrics global = collection.getAllMetrics();
-    assertFalse(global.inputMetrics.isPresent());
-    assertFalse(global.shuffleReadMetrics.isPresent());
-    assertFalse(global.shuffleWriteMetrics.isPresent());
+    assertNull(global.inputMetrics);
+    assertNull(global.shuffleReadMetrics);
+    assertNull(global.shuffleWriteMetrics);
 
     collection.addMetrics(3, 1, 1, makeMetrics(3, 1, 1));
 
     Metrics global2 = collection.getAllMetrics();
-    assertTrue(global2.inputMetrics.isPresent());
-    assertEquals(taskValue(3, 1, 1), global2.inputMetrics.get().bytesRead);
+    assertNotNull(global2.inputMetrics);
+    assertEquals(taskValue(3, 1, 1), global2.inputMetrics.bytesRead);
 
-    assertTrue(global2.shuffleReadMetrics.isPresent());
-    assertTrue(global2.shuffleWriteMetrics.isPresent());
+    assertNotNull(global2.shuffleReadMetrics);
+    assertNotNull(global2.shuffleWriteMetrics);
   }
 
   @Test
@@ -92,28 +89,24 @@ public class TestMetricsCollection {
 
     long value = taskValue(1, 1, 1);
     Metrics metrics1 = new Metrics(value, value, value, value, value, value, value,
-      Optional.fromNullable(new InputMetrics(DataReadMethod.Memory, value)),
-      Optional.<ShuffleReadMetrics>absent(),
-      Optional.<ShuffleWriteMetrics>absent());
+      new InputMetrics(DataReadMethod.Memory, value), null, null);
     Metrics metrics2 = new Metrics(value, value, value, value, value, value, value,
-      Optional.fromNullable(new InputMetrics(DataReadMethod.Disk, value)),
-      Optional.<ShuffleReadMetrics>absent(),
-      Optional.<ShuffleWriteMetrics>absent());
+      new InputMetrics(DataReadMethod.Disk, value), null, null);
 
     collection.addMetrics(1, 1, 1, metrics1);
     collection.addMetrics(1, 1, 2, metrics2);
 
     Metrics global = collection.getAllMetrics();
-    assertTrue(global.inputMetrics.isPresent());
-    assertEquals(DataReadMethod.Multiple, global.inputMetrics.get().readMethod);
+    assertNotNull(global.inputMetrics);
+    assertEquals(DataReadMethod.Multiple, global.inputMetrics.readMethod);
   }
 
   private Metrics makeMetrics(int jobId, int stageId, long taskId) {
     long value = 1000000 * jobId + 1000 * stageId + taskId;
     return new Metrics(value, value, value, value, value, value, value,
-      Optional.fromNullable(new InputMetrics(DataReadMethod.Memory, value)),
-      Optional.fromNullable(new ShuffleReadMetrics((int) value, (int) value, value, value)),
-      Optional.fromNullable(new ShuffleWriteMetrics(value, value)));
+      new InputMetrics(DataReadMethod.Memory, value),
+      new ShuffleReadMetrics((int) value, (int) value, value, value),
+      new ShuffleWriteMetrics(value, value));
   }
 
   /**
@@ -157,19 +150,16 @@ public class TestMetricsCollection {
     assertEquals(expected, metrics.memoryBytesSpilled);
     assertEquals(expected, metrics.diskBytesSpilled);
 
-    InputMetrics im = metrics.inputMetrics.get();
-    assertEquals(DataReadMethod.Memory, im.readMethod);
-    assertEquals(expected, im.bytesRead);
-
-    ShuffleReadMetrics srm = metrics.shuffleReadMetrics.get();
-    assertEquals(expected, srm.remoteBlocksFetched);
-    assertEquals(expected, srm.localBlocksFetched);
-    assertEquals(expected, srm.fetchWaitTime);
-    assertEquals(expected, srm.remoteBytesRead);
-
-    ShuffleWriteMetrics swm = metrics.shuffleWriteMetrics.get();
-    assertEquals(expected, swm.shuffleBytesWritten);
-    assertEquals(expected, swm.shuffleWriteTime);
+    assertEquals(DataReadMethod.Memory, metrics.inputMetrics.readMethod);
+    assertEquals(expected, metrics.inputMetrics.bytesRead);
+
+    assertEquals(expected, metrics.shuffleReadMetrics.remoteBlocksFetched);
+    assertEquals(expected, metrics.shuffleReadMetrics.localBlocksFetched);
+    assertEquals(expected, metrics.shuffleReadMetrics.fetchWaitTime);
+    assertEquals(expected, metrics.shuffleReadMetrics.remoteBytesRead);
+
+    assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten);
+    assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime);
   }
 
 }