You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2016/09/28 01:17:42 UTC
hive git commit: HIVE-14029: Update Spark version to 2.0.0 (Ferdinand
Xu, via Li Rui, Szehon Ho and Sergio Pena)
Repository: hive
Updated Branches:
refs/heads/master 7d3da1778 -> ac977cc88
HIVE-14029: Update Spark version to 2.0.0 (Ferdinand Xu, via Li Rui, Szehon Ho and Sergio Pena)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ac977cc8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ac977cc8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ac977cc8
Branch: refs/heads/master
Commit: ac977cc88757b49fbbd5c3bb236adcedcaae396c
Parents: 7d3da17
Author: Ferdinand Xu <ch...@intel.com>
Authored: Wed Sep 28 01:44:32 2016 +0800
Committer: Ferdinand Xu <ch...@intel.com>
Committed: Wed Sep 28 01:44:32 2016 +0800
----------------------------------------------------------------------
pom.xml | 12 ++-
ql/pom.xml | 26 +++++-
.../exec/spark/HiveBaseFunctionResultList.java | 96 +++++++++-----------
.../hive/ql/exec/spark/HiveMapFunction.java | 2 +-
.../hive/ql/exec/spark/HiveReduceFunction.java | 2 +-
.../hive/ql/exec/spark/SortByShuffler.java | 84 ++++++++---------
.../spark/status/impl/JobMetricsListener.java | 4 +-
.../ql/exec/spark/TestHiveKVResultCache.java | 5 +-
spark-client/pom.xml | 15 ++-
.../hive/spark/client/MetricsCollection.java | 8 +-
.../apache/hive/spark/client/RemoteDriver.java | 4 +-
.../hive/spark/client/metrics/InputMetrics.java | 9 +-
.../hive/spark/client/metrics/Metrics.java | 6 +-
.../client/metrics/ShuffleReadMetrics.java | 18 ++--
.../client/metrics/ShuffleWriteMetrics.java | 4 +-
.../spark/client/TestMetricsCollection.java | 8 +-
16 files changed, 153 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2fb78cd..756cc34 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,7 +145,7 @@
<ivy.version>2.4.0</ivy.version>
<jackson.version>1.9.13</jackson.version>
<!-- jackson 1 and 2 lines can coexist without issue, as they have different artifactIds -->
- <jackson.new.version>2.4.2</jackson.new.version>
+ <jackson.new.version>2.6.5</jackson.new.version>
<jasper.version>5.5.23</jasper.version>
<jamon.plugin.version>2.3.4</jamon.plugin.version>
<jamon-runtime.version>2.3.1</jamon-runtime.version>
@@ -155,6 +155,8 @@
<jdo-api.version>3.0.1</jdo-api.version>
<jetty.version>7.6.0.v20120127</jetty.version>
<jersey.version>1.14</jersey.version>
+ <!-- Glassfish jersey is included for Spark client test only -->
+ <glassfish.jersey.version>2.22.2</glassfish.jersey.version>
<jline.version>2.12</jline.version>
<jms.version>1.1</jms.version>
<joda.version>2.8.1</joda.version>
@@ -168,7 +170,7 @@
<opencsv.version>2.3</opencsv.version>
<mockito-all.version>1.9.5</mockito-all.version>
<mina.version>2.0.0-M5</mina.version>
- <netty.version>4.0.23.Final</netty.version>
+ <netty.version>4.0.29.Final</netty.version>
<parquet.version>1.8.1</parquet.version>
<pig.version>0.16.0</pig.version>
<protobuf.version>2.5.0</protobuf.version>
@@ -178,9 +180,9 @@
<tez.version>0.8.4</tez.version>
<slider.version>0.90.2-incubating</slider.version>
<super-csv.version>2.2.0</super-csv.version>
- <spark.version>1.6.0</spark.version>
- <scala.binary.version>2.10</scala.binary.version>
- <scala.version>2.10.4</scala.version>
+ <spark.version>2.0.0</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
<tempus-fugit.version>1.1</tempus-fugit.version>
<snappy.version>0.2</snappy.version>
<wadl-resourcedoc-doclet.version>1.4</wadl-resourcedoc-doclet.version>
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index 02ddb80..2a93bb7 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -361,7 +361,7 @@
<version>${calcite.version}</version>
<exclusions>
<!-- hsqldb interferes with the use of derby as the default db
- in hive's use of datanucleus.
+ in hive's use of datanucleus.
-->
<exclusion>
<groupId>org.hsqldb</groupId>
@@ -380,14 +380,14 @@
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-avatica</artifactId>
<version>${calcite.version}</version>
<exclusions>
<!-- hsqldb interferes with the use of derby as the default db
- in hive's use of datanucleus.
+ in hive's use of datanucleus.
-->
<exclusion>
<groupId>org.hsqldb</groupId>
@@ -685,6 +685,14 @@
<groupId>commmons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -692,6 +700,18 @@
<artifactId>jersey-servlet</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-server</artifactId>
+ <version>${glassfish.jersey.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ <version>${glassfish.jersey.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
index 5b65036..0fc79f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
@@ -38,15 +38,14 @@ import com.google.common.base.Preconditions;
* through Iterator interface.
*/
@SuppressWarnings("rawtypes")
-public abstract class HiveBaseFunctionResultList<T> implements
- Iterable, OutputCollector<HiveKey, BytesWritable>, Serializable {
+public abstract class HiveBaseFunctionResultList<T>
+ implements Iterator, OutputCollector<HiveKey, BytesWritable>, Serializable {
private static final long serialVersionUID = -1L;
private final Iterator<T> inputIterator;
private boolean isClosed = false;
// Contains results from last processed input record.
private final HiveKVResultCache lastRecordOutput;
- private boolean iteratorAlreadyCreated = false;
public HiveBaseFunctionResultList(Iterator<T> inputIterator) {
this.inputIterator = inputIterator;
@@ -54,13 +53,6 @@ public abstract class HiveBaseFunctionResultList<T> implements
}
@Override
- public Iterator iterator() {
- Preconditions.checkState(!iteratorAlreadyCreated, "Iterator can only be created once.");
- iteratorAlreadyCreated = true;
- return new ResultIterator();
- }
-
- @Override
public void collect(HiveKey key, BytesWritable value) throws IOException {
lastRecordOutput.add(SparkUtilities.copyHiveKey(key),
SparkUtilities.copyBytesWritable(value));
@@ -77,57 +69,55 @@ public abstract class HiveBaseFunctionResultList<T> implements
/** Close the record processor. */
protected abstract void closeRecordProcessor();
- /** Implement Iterator interface. */
- public class ResultIterator implements Iterator {
- @Override
- public boolean hasNext(){
- // Return remaining records (if any) from last processed input record.
- if (lastRecordOutput.hasNext()) {
- return true;
- }
+ @Override
+ public boolean hasNext() {
+ // Return remaining records (if any) from last processed input record.
+ if (lastRecordOutput.hasNext()) {
+ return true;
+ }
- // Process the records in the input iterator until
- // - new output records are available for serving downstream operator,
- // - input records are exhausted or
- // - processing is completed.
- while (inputIterator.hasNext() && !processingDone()) {
- try {
- processNextRecord(inputIterator.next());
- if (lastRecordOutput.hasNext()) {
- return true;
- }
- } catch (IOException ex) {
- throw new IllegalStateException("Error while processing input.", ex);
+ // Process the records in the input iterator until
+ // - new output records are available for serving downstream operator,
+ // - input records are exhausted or
+ // - processing is completed.
+ while (inputIterator.hasNext() && !processingDone()) {
+ try {
+ processNextRecord(inputIterator.next());
+ if (lastRecordOutput.hasNext()) {
+ return true;
}
+ } catch (IOException ex) {
+ throw new IllegalStateException("Error while processing input.", ex);
}
+ }
- // At this point we are done processing the input. Close the record processor
- if (!isClosed) {
- closeRecordProcessor();
- isClosed = true;
- }
-
- // It is possible that some operators add records after closing the processor, so make sure
- // to check the lastRecordOutput
- if (lastRecordOutput.hasNext()) {
- return true;
- }
-
- lastRecordOutput.clear();
- return false;
+ // At this point we are done processing the input. Close the record processor
+ if (!isClosed) {
+ closeRecordProcessor();
+ isClosed = true;
}
- @Override
- public Tuple2<HiveKey, BytesWritable> next() {
- if (hasNext()) {
- return lastRecordOutput.next();
- }
- throw new NoSuchElementException("There are no more elements");
+ // It is possible that some operators add records after closing the processor, so make sure
+ // to check the lastRecordOutput
+ if (lastRecordOutput.hasNext()) {
+ return true;
}
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Iterator.remove() is not supported");
+ lastRecordOutput.clear();
+ return false;
+ }
+
+ @Override
+ public Tuple2<HiveKey, BytesWritable> next() {
+ if (hasNext()) {
+ return lastRecordOutput.next();
}
+ throw new NoSuchElementException("There are no more elements");
}
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Iterator.remove() is not supported");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
index 53c5c0e..ff21a52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
@@ -38,7 +38,7 @@ public class HiveMapFunction extends HivePairFlatMapFunction<
@SuppressWarnings("unchecked")
@Override
- public Iterable<Tuple2<HiveKey, BytesWritable>>
+ public Iterator<Tuple2<HiveKey, BytesWritable>>
call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
initJobConf();
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
index f6595f1..eeb4443 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
@@ -36,7 +36,7 @@ public class HiveReduceFunction extends HivePairFlatMapFunction<
@SuppressWarnings("unchecked")
@Override
- public Iterable<Tuple2<HiveKey, BytesWritable>>
+ public Iterator<Tuple2<HiveKey, BytesWritable>>
call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception {
initJobConf();
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
index a6350d3..997ab7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
@@ -75,60 +75,52 @@ public class SortByShuffler implements SparkShuffler {
private static final long serialVersionUID = 1L;
@Override
- public Iterable<Tuple2<HiveKey, Iterable<BytesWritable>>> call(
- final Iterator<Tuple2<HiveKey, BytesWritable>> it) throws Exception {
+ public Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> call(
+ final Iterator<Tuple2<HiveKey, BytesWritable>> it) throws Exception {
// Use input iterator to back returned iterable object.
- final Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> resultIt =
- new Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>() {
- HiveKey curKey = null;
- List<BytesWritable> curValues = new ArrayList<BytesWritable>();
+ return new Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>() {
+ HiveKey curKey = null;
+ List<BytesWritable> curValues = new ArrayList<BytesWritable>();
- @Override
- public boolean hasNext() {
- return it.hasNext() || curKey != null;
- }
+ @Override
+ public boolean hasNext() {
+ return it.hasNext() || curKey != null;
+ }
- @Override
- public Tuple2<HiveKey, Iterable<BytesWritable>> next() {
- // TODO: implement this by accumulating rows with the same key into a list.
- // Note that this list needs to improved to prevent excessive memory usage, but this
- // can be done in later phase.
- while (it.hasNext()) {
- Tuple2<HiveKey, BytesWritable> pair = it.next();
- if (curKey != null && !curKey.equals(pair._1())) {
- HiveKey key = curKey;
- List<BytesWritable> values = curValues;
- curKey = pair._1();
- curValues = new ArrayList<BytesWritable>();
- curValues.add(pair._2());
- return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, values);
- }
- curKey = pair._1();
- curValues.add(pair._2());
- }
- if (curKey == null) {
- throw new NoSuchElementException();
- }
- // if we get here, this should be the last element we have
+ @Override
+ public Tuple2<HiveKey, Iterable<BytesWritable>> next() {
+ // TODO: implement this by accumulating rows with the same key into a list.
+ // Note that this list needs to improved to prevent excessive memory usage, but this
+ // can be done in later phase.
+ while (it.hasNext()) {
+ Tuple2<HiveKey, BytesWritable> pair = it.next();
+ if (curKey != null && !curKey.equals(pair._1())) {
HiveKey key = curKey;
- curKey = null;
- return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, curValues);
+ List<BytesWritable> values = curValues;
+ curKey = pair._1();
+ curValues = new ArrayList<BytesWritable>();
+ curValues.add(pair._2());
+ return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, values);
}
+ curKey = pair._1();
+ curValues.add(pair._2());
+ }
+ if (curKey == null) {
+ throw new NoSuchElementException();
+ }
+ // if we get here, this should be the last element we have
+ HiveKey key = curKey;
+ curKey = null;
+ return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, curValues);
+ }
- @Override
- public void remove() {
- // Not implemented.
- // throw Unsupported Method Invocation Exception.
- throw new UnsupportedOperationException();
- }
-
- };
-
- return new Iterable<Tuple2<HiveKey, Iterable<BytesWritable>>>() {
@Override
- public Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> iterator() {
- return resultIt;
+ public void remove() {
+ // Not implemented.
+ // throw Unsupported Method Invocation Exception.
+ throw new UnsupportedOperationException();
}
+
};
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
index 09c54c1..b48de3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
@@ -24,15 +24,15 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.spark.JavaSparkListener;
import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-public class JobMetricsListener extends JavaSparkListener {
+public class JobMetricsListener extends SparkListener {
private static final Logger LOG = LoggerFactory.getLogger(JobMetricsListener.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
index ee9f9b7..7bb9c62 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
@@ -282,9 +282,8 @@ public class TestHiveKVResultCache {
resultList.init(rows, threshold, separate, prefix1, prefix2);
long startTime = System.currentTimeMillis();
- Iterator it = resultList.iterator();
- while (it.hasNext()) {
- Object item = it.next();
+ while (resultList.hasNext()) {
+ Object item = resultList.next();
if (output != null) {
output.add((Tuple2<HiveKey, BytesWritable>)item);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/pom.xml
----------------------------------------------------------------------
diff --git a/spark-client/pom.xml b/spark-client/pom.xml
index 6cf3b17..effc13b 100644
--- a/spark-client/pom.xml
+++ b/spark-client/pom.xml
@@ -33,7 +33,6 @@
<properties>
<hive.path.to.root>..</hive.path.to.root>
- <scala.binary.version>2.10</scala.binary.version>
<test.redirectToFile>true</test.redirectToFile>
</properties>
@@ -70,6 +69,14 @@
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
@@ -96,6 +103,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet</artifactId>
+ <version>${glassfish.jersey.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service-rpc</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
index e77aa78..0f03a64 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
@@ -151,7 +151,6 @@ public class MetricsCollection {
// Input metrics.
boolean hasInputMetrics = false;
- DataReadMethod readMethod = null;
long bytesRead = 0L;
// Shuffle read metrics.
@@ -177,11 +176,6 @@ public class MetricsCollection {
if (m.inputMetrics != null) {
hasInputMetrics = true;
- if (readMethod == null) {
- readMethod = m.inputMetrics.readMethod;
- } else if (readMethod != m.inputMetrics.readMethod) {
- readMethod = DataReadMethod.Multiple;
- }
bytesRead += m.inputMetrics.bytesRead;
}
@@ -201,7 +195,7 @@ public class MetricsCollection {
InputMetrics inputMetrics = null;
if (hasInputMetrics) {
- inputMetrics = new InputMetrics(readMethod, bytesRead);
+ inputMetrics = new InputMetrics(bytesRead);
}
ShuffleReadMetrics shuffleReadMetrics = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index e3b88d1..ede8ce9 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -43,11 +43,11 @@ import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.client.rpc.Rpc;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.hive.spark.counter.SparkCounters;
-import org.apache.spark.JavaSparkListener;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
@@ -441,7 +441,7 @@ public class RemoteDriver {
}
- private class ClientListener extends JavaSparkListener {
+ private class ClientListener extends SparkListener {
private final Map<Integer, Integer> stageToJobId = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
index e46b67d..f137007 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
@@ -28,25 +28,20 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
public class InputMetrics implements Serializable {
-
- public final DataReadMethod readMethod;
public final long bytesRead;
private InputMetrics() {
// For Serialization only.
- this(null, 0L);
+ this(0L);
}
public InputMetrics(
- DataReadMethod readMethod,
long bytesRead) {
- this.readMethod = readMethod;
this.bytesRead = bytesRead;
}
public InputMetrics(TaskMetrics metrics) {
- this(DataReadMethod.valueOf(metrics.inputMetrics().get().readMethod().toString()),
- metrics.inputMetrics().get().bytesRead());
+ this(metrics.inputMetrics().bytesRead());
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
index a7305cf..418d534 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
@@ -99,15 +99,15 @@ public class Metrics implements Serializable {
}
private static InputMetrics optionalInputMetric(TaskMetrics metrics) {
- return metrics.inputMetrics().isDefined() ? new InputMetrics(metrics) : null;
+ return (metrics.inputMetrics() != null) ? new InputMetrics(metrics) : null;
}
private static ShuffleReadMetrics optionalShuffleReadMetric(TaskMetrics metrics) {
- return metrics.shuffleReadMetrics().isDefined() ? new ShuffleReadMetrics(metrics) : null;
+ return (metrics.shuffleReadMetrics() != null) ? new ShuffleReadMetrics(metrics) : null;
}
private static ShuffleWriteMetrics optionalShuffleWriteMetrics(TaskMetrics metrics) {
- return metrics.shuffleWriteMetrics().isDefined() ? new ShuffleWriteMetrics(metrics) : null;
+ return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
index be14c06..9ff4d0f 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience;
public class ShuffleReadMetrics implements Serializable {
/** Number of remote blocks fetched in shuffles by tasks. */
- public final int remoteBlocksFetched;
+ public final long remoteBlocksFetched;
/** Number of local blocks fetched in shuffles by tasks. */
- public final int localBlocksFetched;
+ public final long localBlocksFetched;
/**
* Time tasks spent waiting for remote shuffle blocks. This only includes the
* time blocking on shuffle input data. For instance if block B is being
@@ -49,8 +49,8 @@ public class ShuffleReadMetrics implements Serializable {
}
public ShuffleReadMetrics(
- int remoteBlocksFetched,
- int localBlocksFetched,
+ long remoteBlocksFetched,
+ long localBlocksFetched,
long fetchWaitTime,
long remoteBytesRead) {
this.remoteBlocksFetched = remoteBlocksFetched;
@@ -60,16 +60,16 @@ public class ShuffleReadMetrics implements Serializable {
}
public ShuffleReadMetrics(TaskMetrics metrics) {
- this(metrics.shuffleReadMetrics().get().remoteBlocksFetched(),
- metrics.shuffleReadMetrics().get().localBlocksFetched(),
- metrics.shuffleReadMetrics().get().fetchWaitTime(),
- metrics.shuffleReadMetrics().get().remoteBytesRead());
+ this(metrics.shuffleReadMetrics().remoteBlocksFetched(),
+ metrics.shuffleReadMetrics().localBlocksFetched(),
+ metrics.shuffleReadMetrics().fetchWaitTime(),
+ metrics.shuffleReadMetrics().remoteBytesRead());
}
/**
* Number of blocks fetched in shuffle by tasks (remote or local).
*/
- public int getTotalBlocksFetched() {
+ public long getTotalBlocksFetched() {
return remoteBlocksFetched + localBlocksFetched;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
index 4420e4d..64a4b86 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
@@ -47,8 +47,8 @@ public class ShuffleWriteMetrics implements Serializable {
}
public ShuffleWriteMetrics(TaskMetrics metrics) {
- this(metrics.shuffleWriteMetrics().get().shuffleBytesWritten(),
- metrics.shuffleWriteMetrics().get().shuffleWriteTime());
+ this(metrics.shuffleWriteMetrics().shuffleBytesWritten(),
+ metrics.shuffleWriteMetrics().shuffleWriteTime());
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
index 5146e91..8fef66b 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
@@ -95,22 +95,21 @@ public class TestMetricsCollection {
long value = taskValue(1, 1, 1);
Metrics metrics1 = new Metrics(value, value, value, value, value, value, value,
- new InputMetrics(DataReadMethod.Memory, value), null, null);
+ new InputMetrics(value), null, null);
Metrics metrics2 = new Metrics(value, value, value, value, value, value, value,
- new InputMetrics(DataReadMethod.Disk, value), null, null);
+ new InputMetrics(value), null, null);
collection.addMetrics(1, 1, 1, metrics1);
collection.addMetrics(1, 1, 2, metrics2);
Metrics global = collection.getAllMetrics();
assertNotNull(global.inputMetrics);
- assertEquals(DataReadMethod.Multiple, global.inputMetrics.readMethod);
}
private Metrics makeMetrics(int jobId, int stageId, long taskId) {
long value = 1000000 * jobId + 1000 * stageId + taskId;
return new Metrics(value, value, value, value, value, value, value,
- new InputMetrics(DataReadMethod.Memory, value),
+ new InputMetrics(value),
new ShuffleReadMetrics((int) value, (int) value, value, value),
new ShuffleWriteMetrics(value, value));
}
@@ -156,7 +155,6 @@ public class TestMetricsCollection {
assertEquals(expected, metrics.memoryBytesSpilled);
assertEquals(expected, metrics.diskBytesSpilled);
- assertEquals(DataReadMethod.Memory, metrics.inputMetrics.readMethod);
assertEquals(expected, metrics.inputMetrics.bytesRead);
assertEquals(expected, metrics.shuffleReadMetrics.remoteBlocksFetched);