You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/11/19 22:43:02 UTC
svn commit: r1640633 - in /hive/branches/spark/spark-client/src:
main/java/org/apache/hive/spark/client/MetricsCollection.java
main/java/org/apache/hive/spark/client/metrics/Metrics.java
test/java/org/apache/hive/spark/client/TestMetricsCollection.java
Author: szehon
Date: Wed Nov 19 21:43:02 2014
New Revision: 1640633
URL: http://svn.apache.org/r1640633
Log:
HIVE-8854 : Guava dependency conflict between hive driver and remote spark context[Spark Branch] (Marcelo Vanzin via Szehon)
Modified:
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java?rev=1640633&r1=1640632&r2=1640633&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java Wed Nov 19 21:43:02 2014
@@ -26,7 +26,6 @@ import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Function;
-import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
@@ -178,53 +177,50 @@ public class MetricsCollection {
memoryBytesSpilled += m.memoryBytesSpilled;
diskBytesSpilled += m.diskBytesSpilled;
- if (m.inputMetrics.isPresent()) {
+ if (m.inputMetrics != null) {
hasInputMetrics = true;
- InputMetrics im = m.inputMetrics.get();
if (readMethod == null) {
- readMethod = im.readMethod;
- } else if (readMethod != im.readMethod) {
+ readMethod = m.inputMetrics.readMethod;
+ } else if (readMethod != m.inputMetrics.readMethod) {
readMethod = DataReadMethod.Multiple;
}
- bytesRead += im.bytesRead;
+ bytesRead += m.inputMetrics.bytesRead;
}
- if (m.shuffleReadMetrics.isPresent()) {
- ShuffleReadMetrics srm = m.shuffleReadMetrics.get();
+ if (m.shuffleReadMetrics != null) {
hasShuffleReadMetrics = true;
- remoteBlocksFetched += srm.remoteBlocksFetched;
- localBlocksFetched += srm.localBlocksFetched;
- fetchWaitTime += srm.fetchWaitTime;
- remoteBytesRead += srm.remoteBytesRead;
+ remoteBlocksFetched += m.shuffleReadMetrics.remoteBlocksFetched;
+ localBlocksFetched += m.shuffleReadMetrics.localBlocksFetched;
+ fetchWaitTime += m.shuffleReadMetrics.fetchWaitTime;
+ remoteBytesRead += m.shuffleReadMetrics.remoteBytesRead;
}
- if (m.shuffleWriteMetrics.isPresent()) {
- ShuffleWriteMetrics swm = m.shuffleWriteMetrics.get();
+ if (m.shuffleWriteMetrics != null) {
hasShuffleWriteMetrics = true;
- shuffleBytesWritten += swm.shuffleBytesWritten;
- shuffleWriteTime += swm.shuffleWriteTime;
+ shuffleBytesWritten += m.shuffleWriteMetrics.shuffleBytesWritten;
+ shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime;
}
}
- Optional<InputMetrics> inputMetrics = Optional.absent();
+ InputMetrics inputMetrics = null;
if (hasInputMetrics) {
- inputMetrics = Optional.of(new InputMetrics(readMethod, bytesRead));
+ inputMetrics = new InputMetrics(readMethod, bytesRead);
}
- Optional<ShuffleReadMetrics> shuffleReadMetrics = Optional.absent();
+ ShuffleReadMetrics shuffleReadMetrics = null;
if (hasShuffleReadMetrics) {
- shuffleReadMetrics = Optional.of(new ShuffleReadMetrics(
+ shuffleReadMetrics = new ShuffleReadMetrics(
remoteBlocksFetched,
localBlocksFetched,
fetchWaitTime,
- remoteBytesRead));
+ remoteBytesRead);
}
- Optional<ShuffleWriteMetrics> shuffleWriteMetrics = Optional.absent();
+ ShuffleWriteMetrics shuffleWriteMetrics = null;
if (hasShuffleReadMetrics) {
- shuffleWriteMetrics = Optional.of(new ShuffleWriteMetrics(
+ shuffleWriteMetrics = new ShuffleWriteMetrics(
shuffleBytesWritten,
- shuffleWriteTime));
+ shuffleWriteTime);
}
return new Metrics(
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java?rev=1640633&r1=1640632&r2=1640633&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java Wed Nov 19 21:43:02 2014
@@ -19,7 +19,6 @@ package org.apache.hive.spark.client.met
import java.io.Serializable;
-import com.google.common.base.Optional;
import org.apache.spark.executor.TaskMetrics;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
@@ -48,14 +47,14 @@ public class Metrics implements Serializ
/** The number of on-disk bytes spilled by tasks. */
public final long diskBytesSpilled;
/** If tasks read from a HadoopRDD or from persisted data, metrics on how much data was read. */
- public final Optional<InputMetrics> inputMetrics;
+ public final InputMetrics inputMetrics;
/**
* If tasks read from shuffle output, metrics on getting shuffle data. This includes read metrics
* aggregated over all the tasks' shuffle dependencies.
*/
- public final Optional<ShuffleReadMetrics> shuffleReadMetrics;
+ public final ShuffleReadMetrics shuffleReadMetrics;
/** If tasks wrote to shuffle output, metrics on the written shuffle data. */
- public final Optional<ShuffleWriteMetrics> shuffleWriteMetrics;
+ public final ShuffleWriteMetrics shuffleWriteMetrics;
public Metrics(
long executorDeserializeTime,
@@ -65,9 +64,9 @@ public class Metrics implements Serializ
long resultSerializationTime,
long memoryBytesSpilled,
long diskBytesSpilled,
- Optional<InputMetrics> inputMetrics,
- Optional<ShuffleReadMetrics> shuffleReadMetrics,
- Optional<ShuffleWriteMetrics> shuffleWriteMetrics) {
+ InputMetrics inputMetrics,
+ ShuffleReadMetrics shuffleReadMetrics,
+ ShuffleWriteMetrics shuffleWriteMetrics) {
this.executorDeserializeTime = executorDeserializeTime;
this.executorRunTime = executorRunTime;
this.resultSize = resultSize;
@@ -94,24 +93,16 @@ public class Metrics implements Serializ
optionalShuffleWriteMetrics(metrics));
}
- private static final Optional<InputMetrics> optionalInputMetric(TaskMetrics metrics) {
- return metrics.inputMetrics().isDefined()
- ? Optional.of(new InputMetrics(metrics))
- : Optional.<InputMetrics>absent();
+ private static final InputMetrics optionalInputMetric(TaskMetrics metrics) {
+ return metrics.inputMetrics().isDefined() ? new InputMetrics(metrics) : null;
}
- private static final Optional<ShuffleReadMetrics>
- optionalShuffleReadMetric(TaskMetrics metrics) {
- return metrics.shuffleReadMetrics().isDefined()
- ? Optional.of(new ShuffleReadMetrics(metrics))
- : Optional.<ShuffleReadMetrics>absent();
+ private static final ShuffleReadMetrics optionalShuffleReadMetric(TaskMetrics metrics) {
+ return metrics.shuffleReadMetrics().isDefined() ? new ShuffleReadMetrics(metrics) : null;
}
- private static final Optional<ShuffleWriteMetrics>
- optionalShuffleWriteMetrics(TaskMetrics metrics) {
- return metrics.shuffleWriteMetrics().isDefined()
- ? Optional.of(new ShuffleWriteMetrics(metrics))
- : Optional.<ShuffleWriteMetrics>absent();
+ private static final ShuffleWriteMetrics optionalShuffleWriteMetrics(TaskMetrics metrics) {
+ return metrics.shuffleWriteMetrics().isDefined() ? new ShuffleWriteMetrics(metrics) : null;
}
}
Modified: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java?rev=1640633&r1=1640632&r2=1640633&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java (original)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java Wed Nov 19 21:43:02 2014
@@ -19,7 +19,6 @@ package org.apache.hive.spark.client;
import java.util.Arrays;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.junit.Test;
@@ -62,9 +61,7 @@ public class TestMetricsCollection {
public void testOptionalMetrics() {
long value = taskValue(1, 1, 1L);
Metrics metrics = new Metrics(value, value, value, value, value, value, value,
- Optional.<InputMetrics>absent(),
- Optional.<ShuffleReadMetrics>absent(),
- Optional.<ShuffleWriteMetrics>absent());
+ null, null, null);
MetricsCollection collection = new MetricsCollection();
for (int i : Arrays.asList(1, 2)) {
@@ -72,18 +69,18 @@ public class TestMetricsCollection {
}
Metrics global = collection.getAllMetrics();
- assertFalse(global.inputMetrics.isPresent());
- assertFalse(global.shuffleReadMetrics.isPresent());
- assertFalse(global.shuffleWriteMetrics.isPresent());
+ assertNull(global.inputMetrics);
+ assertNull(global.shuffleReadMetrics);
+ assertNull(global.shuffleWriteMetrics);
collection.addMetrics(3, 1, 1, makeMetrics(3, 1, 1));
Metrics global2 = collection.getAllMetrics();
- assertTrue(global2.inputMetrics.isPresent());
- assertEquals(taskValue(3, 1, 1), global2.inputMetrics.get().bytesRead);
+ assertNotNull(global2.inputMetrics);
+ assertEquals(taskValue(3, 1, 1), global2.inputMetrics.bytesRead);
- assertTrue(global2.shuffleReadMetrics.isPresent());
- assertTrue(global2.shuffleWriteMetrics.isPresent());
+ assertNotNull(global2.shuffleReadMetrics);
+ assertNotNull(global2.shuffleWriteMetrics);
}
@Test
@@ -92,28 +89,24 @@ public class TestMetricsCollection {
long value = taskValue(1, 1, 1);
Metrics metrics1 = new Metrics(value, value, value, value, value, value, value,
- Optional.fromNullable(new InputMetrics(DataReadMethod.Memory, value)),
- Optional.<ShuffleReadMetrics>absent(),
- Optional.<ShuffleWriteMetrics>absent());
+ new InputMetrics(DataReadMethod.Memory, value), null, null);
Metrics metrics2 = new Metrics(value, value, value, value, value, value, value,
- Optional.fromNullable(new InputMetrics(DataReadMethod.Disk, value)),
- Optional.<ShuffleReadMetrics>absent(),
- Optional.<ShuffleWriteMetrics>absent());
+ new InputMetrics(DataReadMethod.Disk, value), null, null);
collection.addMetrics(1, 1, 1, metrics1);
collection.addMetrics(1, 1, 2, metrics2);
Metrics global = collection.getAllMetrics();
- assertTrue(global.inputMetrics.isPresent());
- assertEquals(DataReadMethod.Multiple, global.inputMetrics.get().readMethod);
+ assertNotNull(global.inputMetrics);
+ assertEquals(DataReadMethod.Multiple, global.inputMetrics.readMethod);
}
private Metrics makeMetrics(int jobId, int stageId, long taskId) {
long value = 1000000 * jobId + 1000 * stageId + taskId;
return new Metrics(value, value, value, value, value, value, value,
- Optional.fromNullable(new InputMetrics(DataReadMethod.Memory, value)),
- Optional.fromNullable(new ShuffleReadMetrics((int) value, (int) value, value, value)),
- Optional.fromNullable(new ShuffleWriteMetrics(value, value)));
+ new InputMetrics(DataReadMethod.Memory, value),
+ new ShuffleReadMetrics((int) value, (int) value, value, value),
+ new ShuffleWriteMetrics(value, value));
}
/**
@@ -157,19 +150,16 @@ public class TestMetricsCollection {
assertEquals(expected, metrics.memoryBytesSpilled);
assertEquals(expected, metrics.diskBytesSpilled);
- InputMetrics im = metrics.inputMetrics.get();
- assertEquals(DataReadMethod.Memory, im.readMethod);
- assertEquals(expected, im.bytesRead);
-
- ShuffleReadMetrics srm = metrics.shuffleReadMetrics.get();
- assertEquals(expected, srm.remoteBlocksFetched);
- assertEquals(expected, srm.localBlocksFetched);
- assertEquals(expected, srm.fetchWaitTime);
- assertEquals(expected, srm.remoteBytesRead);
-
- ShuffleWriteMetrics swm = metrics.shuffleWriteMetrics.get();
- assertEquals(expected, swm.shuffleBytesWritten);
- assertEquals(expected, swm.shuffleWriteTime);
+ assertEquals(DataReadMethod.Memory, metrics.inputMetrics.readMethod);
+ assertEquals(expected, metrics.inputMetrics.bytesRead);
+
+ assertEquals(expected, metrics.shuffleReadMetrics.remoteBlocksFetched);
+ assertEquals(expected, metrics.shuffleReadMetrics.localBlocksFetched);
+ assertEquals(expected, metrics.shuffleReadMetrics.fetchWaitTime);
+ assertEquals(expected, metrics.shuffleReadMetrics.remoteBytesRead);
+
+ assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten);
+ assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime);
}
}