You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2016/09/28 01:17:42 UTC

hive git commit: HIVE-14029: Update Spark version to 2.0.0 (Ferdinand Xu, via Li Rui, Szehon Ho and Sergio Pena)

Repository: hive
Updated Branches:
  refs/heads/master 7d3da1778 -> ac977cc88


HIVE-14029: Update Spark version to 2.0.0 (Ferdinand Xu, via Li Rui, Szehon Ho and Sergio Pena)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ac977cc8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ac977cc8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ac977cc8

Branch: refs/heads/master
Commit: ac977cc88757b49fbbd5c3bb236adcedcaae396c
Parents: 7d3da17
Author: Ferdinand Xu <ch...@intel.com>
Authored: Wed Sep 28 01:44:32 2016 +0800
Committer: Ferdinand Xu <ch...@intel.com>
Committed: Wed Sep 28 01:44:32 2016 +0800

----------------------------------------------------------------------
 pom.xml                                         | 12 ++-
 ql/pom.xml                                      | 26 +++++-
 .../exec/spark/HiveBaseFunctionResultList.java  | 96 +++++++++-----------
 .../hive/ql/exec/spark/HiveMapFunction.java     |  2 +-
 .../hive/ql/exec/spark/HiveReduceFunction.java  |  2 +-
 .../hive/ql/exec/spark/SortByShuffler.java      | 84 ++++++++---------
 .../spark/status/impl/JobMetricsListener.java   |  4 +-
 .../ql/exec/spark/TestHiveKVResultCache.java    |  5 +-
 spark-client/pom.xml                            | 15 ++-
 .../hive/spark/client/MetricsCollection.java    |  8 +-
 .../apache/hive/spark/client/RemoteDriver.java  |  4 +-
 .../hive/spark/client/metrics/InputMetrics.java |  9 +-
 .../hive/spark/client/metrics/Metrics.java      |  6 +-
 .../client/metrics/ShuffleReadMetrics.java      | 18 ++--
 .../client/metrics/ShuffleWriteMetrics.java     |  4 +-
 .../spark/client/TestMetricsCollection.java     |  8 +-
 16 files changed, 153 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2fb78cd..756cc34 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,7 +145,7 @@
     <ivy.version>2.4.0</ivy.version>
     <jackson.version>1.9.13</jackson.version>
     <!-- jackson 1 and 2 lines can coexist without issue, as they have different artifactIds -->
-    <jackson.new.version>2.4.2</jackson.new.version>
+    <jackson.new.version>2.6.5</jackson.new.version>
     <jasper.version>5.5.23</jasper.version>
     <jamon.plugin.version>2.3.4</jamon.plugin.version>
     <jamon-runtime.version>2.3.1</jamon-runtime.version>
@@ -155,6 +155,8 @@
     <jdo-api.version>3.0.1</jdo-api.version>
     <jetty.version>7.6.0.v20120127</jetty.version>
     <jersey.version>1.14</jersey.version>
+    <!-- Glassfish jersey is included for Spark client test only -->
+    <glassfish.jersey.version>2.22.2</glassfish.jersey.version>
     <jline.version>2.12</jline.version>
     <jms.version>1.1</jms.version>
     <joda.version>2.8.1</joda.version>
@@ -168,7 +170,7 @@
     <opencsv.version>2.3</opencsv.version>
     <mockito-all.version>1.9.5</mockito-all.version>
     <mina.version>2.0.0-M5</mina.version>
-    <netty.version>4.0.23.Final</netty.version>
+    <netty.version>4.0.29.Final</netty.version>
     <parquet.version>1.8.1</parquet.version>
     <pig.version>0.16.0</pig.version>
     <protobuf.version>2.5.0</protobuf.version>
@@ -178,9 +180,9 @@
     <tez.version>0.8.4</tez.version>
     <slider.version>0.90.2-incubating</slider.version>
     <super-csv.version>2.2.0</super-csv.version>
-    <spark.version>1.6.0</spark.version>
-    <scala.binary.version>2.10</scala.binary.version>
-    <scala.version>2.10.4</scala.version>
+    <spark.version>2.0.0</spark.version>
+    <scala.binary.version>2.11</scala.binary.version>
+    <scala.version>2.11.8</scala.version>
     <tempus-fugit.version>1.1</tempus-fugit.version>
     <snappy.version>0.2</snappy.version>
     <wadl-resourcedoc-doclet.version>1.4</wadl-resourcedoc-doclet.version>

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index 02ddb80..2a93bb7 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -361,7 +361,7 @@
       <version>${calcite.version}</version>
       <exclusions>
         <!-- hsqldb interferes with the use of derby as the default db
-          in hive's use of datanucleus. 
+          in hive's use of datanucleus.
         -->
         <exclusion>
           <groupId>org.hsqldb</groupId>
@@ -380,14 +380,14 @@
           <artifactId>jackson-core</artifactId>
         </exclusion>
       </exclusions>
-    </dependency>   
+    </dependency>
     <dependency>
       <groupId>org.apache.calcite</groupId>
       <artifactId>calcite-avatica</artifactId>
       <version>${calcite.version}</version>
       <exclusions>
         <!-- hsqldb interferes with the use of derby as the default db
-          in hive's use of datanucleus. 
+          in hive's use of datanucleus.
         -->
         <exclusion>
           <groupId>org.hsqldb</groupId>
@@ -685,6 +685,14 @@
          <groupId>commmons-logging</groupId>
          <artifactId>commons-logging</artifactId>
        </exclusion>
+       <exclusion>
+         <groupId>org.glassfish.jersey.containers</groupId>
+         <artifactId>*</artifactId>
+       </exclusion>
+       <exclusion>
+         <groupId>org.glassfish.jersey.core</groupId>
+         <artifactId>*</artifactId>
+       </exclusion>
      </exclusions>
    </dependency>
     <dependency>
@@ -692,6 +700,18 @@
       <artifactId>jersey-servlet</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.core</groupId>
+      <artifactId>jersey-server</artifactId>
+      <version>${glassfish.jersey.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.containers</groupId>
+      <artifactId>jersey-container-servlet-core</artifactId>
+      <version>${glassfish.jersey.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
index 5b65036..0fc79f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
@@ -38,15 +38,14 @@ import com.google.common.base.Preconditions;
  *     through Iterator interface.
  */
 @SuppressWarnings("rawtypes")
-public abstract class HiveBaseFunctionResultList<T> implements
-    Iterable, OutputCollector<HiveKey, BytesWritable>, Serializable {
+public abstract class HiveBaseFunctionResultList<T>
+  implements Iterator, OutputCollector<HiveKey, BytesWritable>, Serializable {
   private static final long serialVersionUID = -1L;
   private final Iterator<T> inputIterator;
   private boolean isClosed = false;
 
   // Contains results from last processed input record.
   private final HiveKVResultCache lastRecordOutput;
-  private boolean iteratorAlreadyCreated = false;
 
   public HiveBaseFunctionResultList(Iterator<T> inputIterator) {
     this.inputIterator = inputIterator;
@@ -54,13 +53,6 @@ public abstract class HiveBaseFunctionResultList<T> implements
   }
 
   @Override
-  public Iterator iterator() {
-    Preconditions.checkState(!iteratorAlreadyCreated, "Iterator can only be created once.");
-    iteratorAlreadyCreated = true;
-    return new ResultIterator();
-  }
-
-  @Override
   public void collect(HiveKey key, BytesWritable value) throws IOException {
     lastRecordOutput.add(SparkUtilities.copyHiveKey(key),
         SparkUtilities.copyBytesWritable(value));
@@ -77,57 +69,55 @@ public abstract class HiveBaseFunctionResultList<T> implements
   /** Close the record processor. */
   protected abstract void closeRecordProcessor();
 
-  /** Implement Iterator interface. */
-  public class ResultIterator implements Iterator {
-    @Override
-    public boolean hasNext(){
-      // Return remaining records (if any) from last processed input record.
-      if (lastRecordOutput.hasNext()) {
-        return true;
-      }
+  @Override
+  public boolean hasNext() {
+    // Return remaining records (if any) from last processed input record.
+    if (lastRecordOutput.hasNext()) {
+      return true;
+    }
 
-      // Process the records in the input iterator until
-      //  - new output records are available for serving downstream operator,
-      //  - input records are exhausted or
-      //  - processing is completed.
-      while (inputIterator.hasNext() && !processingDone()) {
-        try {
-          processNextRecord(inputIterator.next());
-          if (lastRecordOutput.hasNext()) {
-            return true;
-          }
-        } catch (IOException ex) {
-          throw new IllegalStateException("Error while processing input.", ex);
+    // Process the records in the input iterator until
+    //  - new output records are available for serving downstream operator,
+    //  - input records are exhausted or
+    //  - processing is completed.
+    while (inputIterator.hasNext() && !processingDone()) {
+      try {
+        processNextRecord(inputIterator.next());
+        if (lastRecordOutput.hasNext()) {
+          return true;
         }
+      } catch (IOException ex) {
+        throw new IllegalStateException("Error while processing input.", ex);
       }
+    }
 
-      // At this point we are done processing the input. Close the record processor
-      if (!isClosed) {
-        closeRecordProcessor();
-        isClosed = true;
-      }
-
-      // It is possible that some operators add records after closing the processor, so make sure
-      // to check the lastRecordOutput
-      if (lastRecordOutput.hasNext()) {
-        return true;
-      }
-
-      lastRecordOutput.clear();
-      return false;
+    // At this point we are done processing the input. Close the record processor
+    if (!isClosed) {
+      closeRecordProcessor();
+      isClosed = true;
     }
 
-    @Override
-    public Tuple2<HiveKey, BytesWritable> next() {
-      if (hasNext()) {
-        return lastRecordOutput.next();
-      }
-      throw new NoSuchElementException("There are no more elements");
+    // It is possible that some operators add records after closing the processor, so make sure
+    // to check the lastRecordOutput
+    if (lastRecordOutput.hasNext()) {
+      return true;
     }
 
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("Iterator.remove() is not supported");
+    lastRecordOutput.clear();
+    return false;
+  }
+
+  @Override
+  public Tuple2<HiveKey, BytesWritable> next() {
+    if (hasNext()) {
+      return lastRecordOutput.next();
     }
+    throw new NoSuchElementException("There are no more elements");
   }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("Iterator.remove() is not supported");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
index 53c5c0e..ff21a52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
@@ -38,7 +38,7 @@ public class HiveMapFunction extends HivePairFlatMapFunction<
 
   @SuppressWarnings("unchecked")
   @Override
-  public Iterable<Tuple2<HiveKey, BytesWritable>>
+  public Iterator<Tuple2<HiveKey, BytesWritable>>
   call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
     initJobConf();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
index f6595f1..eeb4443 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
@@ -36,7 +36,7 @@ public class HiveReduceFunction extends HivePairFlatMapFunction<
 
   @SuppressWarnings("unchecked")
   @Override
-  public Iterable<Tuple2<HiveKey, BytesWritable>>
+  public Iterator<Tuple2<HiveKey, BytesWritable>>
   call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception {
     initJobConf();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
index a6350d3..997ab7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
@@ -75,60 +75,52 @@ public class SortByShuffler implements SparkShuffler {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public Iterable<Tuple2<HiveKey, Iterable<BytesWritable>>> call(
-        final Iterator<Tuple2<HiveKey, BytesWritable>> it) throws Exception {
+    public Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> call(
+      final Iterator<Tuple2<HiveKey, BytesWritable>> it) throws Exception {
       // Use input iterator to back returned iterable object.
-      final Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> resultIt =
-          new Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>() {
-            HiveKey curKey = null;
-            List<BytesWritable> curValues = new ArrayList<BytesWritable>();
+      return new Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>() {
+        HiveKey curKey = null;
+        List<BytesWritable> curValues = new ArrayList<BytesWritable>();
 
-            @Override
-            public boolean hasNext() {
-              return it.hasNext() || curKey != null;
-            }
+        @Override
+        public boolean hasNext() {
+          return it.hasNext() || curKey != null;
+        }
 
-            @Override
-            public Tuple2<HiveKey, Iterable<BytesWritable>> next() {
-              // TODO: implement this by accumulating rows with the same key into a list.
-              // Note that this list needs to improved to prevent excessive memory usage, but this
-              // can be done in later phase.
-              while (it.hasNext()) {
-                Tuple2<HiveKey, BytesWritable> pair = it.next();
-                if (curKey != null && !curKey.equals(pair._1())) {
-                  HiveKey key = curKey;
-                  List<BytesWritable> values = curValues;
-                  curKey = pair._1();
-                  curValues = new ArrayList<BytesWritable>();
-                  curValues.add(pair._2());
-                  return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, values);
-                }
-                curKey = pair._1();
-                curValues.add(pair._2());
-              }
-              if (curKey == null) {
-                throw new NoSuchElementException();
-              }
-              // if we get here, this should be the last element we have
+        @Override
+        public Tuple2<HiveKey, Iterable<BytesWritable>> next() {
+          // TODO: implement this by accumulating rows with the same key into a list.
+          // Note that this list needs to improved to prevent excessive memory usage, but this
+          // can be done in later phase.
+          while (it.hasNext()) {
+            Tuple2<HiveKey, BytesWritable> pair = it.next();
+            if (curKey != null && !curKey.equals(pair._1())) {
               HiveKey key = curKey;
-              curKey = null;
-              return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, curValues);
+              List<BytesWritable> values = curValues;
+              curKey = pair._1();
+              curValues = new ArrayList<BytesWritable>();
+              curValues.add(pair._2());
+              return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, values);
             }
+            curKey = pair._1();
+            curValues.add(pair._2());
+          }
+          if (curKey == null) {
+            throw new NoSuchElementException();
+          }
+          // if we get here, this should be the last element we have
+          HiveKey key = curKey;
+          curKey = null;
+          return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, curValues);
+        }
 
-            @Override
-            public void remove() {
-              // Not implemented.
-              // throw Unsupported Method Invocation Exception.
-              throw new UnsupportedOperationException();
-            }
-
-          };
-
-      return new Iterable<Tuple2<HiveKey, Iterable<BytesWritable>>>() {
         @Override
-        public Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> iterator() {
-          return resultIt;
+        public void remove() {
+          // Not implemented.
+          // throw Unsupported Method Invocation Exception.
+          throw new UnsupportedOperationException();
         }
+
       };
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
index 09c54c1..b48de3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
@@ -24,15 +24,15 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.spark.JavaSparkListener;
 import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.SparkListener;
 import org.apache.spark.scheduler.SparkListenerJobStart;
 import org.apache.spark.scheduler.SparkListenerTaskEnd;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-public class JobMetricsListener extends JavaSparkListener {
+public class JobMetricsListener extends SparkListener {
 
   private static final Logger LOG = LoggerFactory.getLogger(JobMetricsListener.class);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
index ee9f9b7..7bb9c62 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
@@ -282,9 +282,8 @@ public class TestHiveKVResultCache {
 
     resultList.init(rows, threshold, separate, prefix1, prefix2);
     long startTime = System.currentTimeMillis();
-    Iterator it = resultList.iterator();
-    while (it.hasNext()) {
-      Object item = it.next();
+    while (resultList.hasNext()) {
+      Object item = resultList.next();
       if (output != null) {
         output.add((Tuple2<HiveKey, BytesWritable>)item);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/pom.xml
----------------------------------------------------------------------
diff --git a/spark-client/pom.xml b/spark-client/pom.xml
index 6cf3b17..effc13b 100644
--- a/spark-client/pom.xml
+++ b/spark-client/pom.xml
@@ -33,7 +33,6 @@
 
   <properties>
     <hive.path.to.root>..</hive.path.to.root>
-    <scala.binary.version>2.10</scala.binary.version>
     <test.redirectToFile>true</test.redirectToFile>
   </properties>
 
@@ -70,6 +69,14 @@
          <groupId>com.esotericsoftware.kryo</groupId>
          <artifactId>kryo</artifactId>
        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.jersey.containers</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.jersey.core</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
@@ -96,6 +103,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.glassfish.jersey.containers</groupId>
+      <artifactId>jersey-container-servlet</artifactId>
+      <version>${glassfish.jersey.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-service-rpc</artifactId>
       <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
index e77aa78..0f03a64 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
@@ -151,7 +151,6 @@ public class MetricsCollection {
 
       // Input metrics.
       boolean hasInputMetrics = false;
-      DataReadMethod readMethod = null;
       long bytesRead = 0L;
 
       // Shuffle read metrics.
@@ -177,11 +176,6 @@ public class MetricsCollection {
 
         if (m.inputMetrics != null) {
           hasInputMetrics = true;
-          if (readMethod == null) {
-            readMethod = m.inputMetrics.readMethod;
-          } else if (readMethod != m.inputMetrics.readMethod) {
-            readMethod = DataReadMethod.Multiple;
-          }
           bytesRead += m.inputMetrics.bytesRead;
         }
 
@@ -201,7 +195,7 @@ public class MetricsCollection {
 
       InputMetrics inputMetrics = null;
       if (hasInputMetrics) {
-        inputMetrics = new InputMetrics(readMethod, bytesRead);
+        inputMetrics = new InputMetrics(bytesRead);
       }
 
       ShuffleReadMetrics shuffleReadMetrics = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index e3b88d1..ede8ce9 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -43,11 +43,11 @@ import org.apache.hive.spark.client.metrics.Metrics;
 import org.apache.hive.spark.client.rpc.Rpc;
 import org.apache.hive.spark.client.rpc.RpcConfiguration;
 import org.apache.hive.spark.counter.SparkCounters;
-import org.apache.spark.JavaSparkListener;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkJobInfo;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.scheduler.SparkListener;
 import org.apache.spark.scheduler.SparkListenerJobEnd;
 import org.apache.spark.scheduler.SparkListenerJobStart;
 import org.apache.spark.scheduler.SparkListenerTaskEnd;
@@ -441,7 +441,7 @@ public class RemoteDriver {
 
   }
 
-  private class ClientListener extends JavaSparkListener {
+  private class ClientListener extends SparkListener {
 
     private final Map<Integer, Integer> stageToJobId = Maps.newHashMap();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
index e46b67d..f137007 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
@@ -28,25 +28,20 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public class InputMetrics implements Serializable {
-
-  public final DataReadMethod readMethod;
   public final long bytesRead;
 
   private InputMetrics() {
     // For Serialization only.
-    this(null, 0L);
+    this(0L);
   }
 
   public InputMetrics(
-      DataReadMethod readMethod,
       long bytesRead) {
-    this.readMethod = readMethod;
     this.bytesRead = bytesRead;
   }
 
   public InputMetrics(TaskMetrics metrics) {
-    this(DataReadMethod.valueOf(metrics.inputMetrics().get().readMethod().toString()),
-      metrics.inputMetrics().get().bytesRead());
+    this(metrics.inputMetrics().bytesRead());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
index a7305cf..418d534 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
@@ -99,15 +99,15 @@ public class Metrics implements Serializable {
   }
 
   private static InputMetrics optionalInputMetric(TaskMetrics metrics) {
-    return metrics.inputMetrics().isDefined() ? new InputMetrics(metrics) : null;
+    return (metrics.inputMetrics() != null) ? new InputMetrics(metrics) : null;
   }
 
   private static ShuffleReadMetrics optionalShuffleReadMetric(TaskMetrics metrics) {
-    return metrics.shuffleReadMetrics().isDefined() ? new ShuffleReadMetrics(metrics) : null;
+    return (metrics.shuffleReadMetrics() != null) ? new ShuffleReadMetrics(metrics) : null;
   }
 
   private static ShuffleWriteMetrics optionalShuffleWriteMetrics(TaskMetrics metrics) {
-    return metrics.shuffleWriteMetrics().isDefined() ? new ShuffleWriteMetrics(metrics) : null;
+    return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
index be14c06..9ff4d0f 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 public class ShuffleReadMetrics implements Serializable {
 
   /** Number of remote blocks fetched in shuffles by tasks. */
-  public final int remoteBlocksFetched;
+  public final long remoteBlocksFetched;
   /** Number of local blocks fetched in shuffles by tasks. */
-  public final int localBlocksFetched;
+  public final long localBlocksFetched;
   /**
    * Time tasks spent waiting for remote shuffle blocks. This only includes the
    * time blocking on shuffle input data. For instance if block B is being
@@ -49,8 +49,8 @@ public class ShuffleReadMetrics implements Serializable {
   }
 
   public ShuffleReadMetrics(
-      int remoteBlocksFetched,
-      int localBlocksFetched,
+      long remoteBlocksFetched,
+      long localBlocksFetched,
       long fetchWaitTime,
       long remoteBytesRead) {
     this.remoteBlocksFetched = remoteBlocksFetched;
@@ -60,16 +60,16 @@ public class ShuffleReadMetrics implements Serializable {
   }
 
   public ShuffleReadMetrics(TaskMetrics metrics) {
-    this(metrics.shuffleReadMetrics().get().remoteBlocksFetched(),
-      metrics.shuffleReadMetrics().get().localBlocksFetched(),
-      metrics.shuffleReadMetrics().get().fetchWaitTime(),
-      metrics.shuffleReadMetrics().get().remoteBytesRead());
+    this(metrics.shuffleReadMetrics().remoteBlocksFetched(),
+      metrics.shuffleReadMetrics().localBlocksFetched(),
+      metrics.shuffleReadMetrics().fetchWaitTime(),
+      metrics.shuffleReadMetrics().remoteBytesRead());
   }
 
   /**
    * Number of blocks fetched in shuffle by tasks (remote or local).
    */
-  public int getTotalBlocksFetched() {
+  public long getTotalBlocksFetched() {
     return remoteBlocksFetched + localBlocksFetched;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
index 4420e4d..64a4b86 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
@@ -47,8 +47,8 @@ public class ShuffleWriteMetrics implements Serializable {
   }
 
   public ShuffleWriteMetrics(TaskMetrics metrics) {
-    this(metrics.shuffleWriteMetrics().get().shuffleBytesWritten(),
-      metrics.shuffleWriteMetrics().get().shuffleWriteTime());
+    this(metrics.shuffleWriteMetrics().shuffleBytesWritten(),
+      metrics.shuffleWriteMetrics().shuffleWriteTime());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
index 5146e91..8fef66b 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
@@ -95,22 +95,21 @@ public class TestMetricsCollection {
 
     long value = taskValue(1, 1, 1);
     Metrics metrics1 = new Metrics(value, value, value, value, value, value, value,
-      new InputMetrics(DataReadMethod.Memory, value), null, null);
+      new InputMetrics(value), null, null);
     Metrics metrics2 = new Metrics(value, value, value, value, value, value, value,
-      new InputMetrics(DataReadMethod.Disk, value), null, null);
+      new InputMetrics(value), null, null);
 
     collection.addMetrics(1, 1, 1, metrics1);
     collection.addMetrics(1, 1, 2, metrics2);
 
     Metrics global = collection.getAllMetrics();
     assertNotNull(global.inputMetrics);
-    assertEquals(DataReadMethod.Multiple, global.inputMetrics.readMethod);
   }
 
   private Metrics makeMetrics(int jobId, int stageId, long taskId) {
     long value = 1000000 * jobId + 1000 * stageId + taskId;
     return new Metrics(value, value, value, value, value, value, value,
-      new InputMetrics(DataReadMethod.Memory, value),
+      new InputMetrics(value),
       new ShuffleReadMetrics((int) value, (int) value, value, value),
       new ShuffleWriteMetrics(value, value));
   }
@@ -156,7 +155,6 @@ public class TestMetricsCollection {
     assertEquals(expected, metrics.memoryBytesSpilled);
     assertEquals(expected, metrics.diskBytesSpilled);
 
-    assertEquals(DataReadMethod.Memory, metrics.inputMetrics.readMethod);
     assertEquals(expected, metrics.inputMetrics.bytesRead);
 
     assertEquals(expected, metrics.shuffleReadMetrics.remoteBlocksFetched);