You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/04 16:10:32 UTC

svn commit: r1636599 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: ./ Statistic/ status/ status/impl/

Author: xuefu
Date: Tue Nov  4 15:10:32 2014
New Revision: 1636599

URL: http://svn.apache.org/r1636599
Log:
HIVE-8670: Combine Hive Operator statistic and Spark Metric to an uniformed query statistic.[Spark Branch] (Chengxiang via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1636599&r1=1636598&r2=1636599&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Tue Nov  4 15:10:32 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -36,6 +37,9 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
@@ -103,7 +107,10 @@ public class SparkTask extends Task<Spar
       sparkCounters = jobRef.getSparkJobStatus().getCounter();
       SparkJobMonitor monitor = new SparkJobMonitor(jobRef.getSparkJobStatus());
       monitor.startMonitor();
-      console.printInfo(sparkCounters.toString());
+      SparkStatistics sparkStatistics = jobRef.getSparkJobStatus().getSparkStatistics();
+      if (LOG.isInfoEnabled() && sparkStatistics != null) {
+        logSparkStatistic(sparkStatistics);
+      }
       rc = 0;
     } catch (Exception e) {
       LOG.error("Failed to execute spark task.", e);
@@ -121,6 +128,19 @@ public class SparkTask extends Task<Spar
     return rc;
   }
 
+  private void logSparkStatistic(SparkStatistics sparkStatistic) {
+    Iterator<SparkStatisticGroup> groupIterator = sparkStatistic.getStatisticGroups();
+    while (groupIterator.hasNext()) {
+      SparkStatisticGroup group = groupIterator.next();
+      LOG.info(group.getGroupName());
+      Iterator<SparkStatistic> statisticIterator = group.getStatistics();
+      while (statisticIterator.hasNext()) {
+        SparkStatistic statistic = statisticIterator.next();
+        LOG.info("\t" + statistic.getName() + ": " + statistic.getValue());
+      }
+    }
+  }
+
   /**
    * close will move the temp files into the right place for the fetch
    * task. If the job has failed it will clean up the files.

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java?rev=1636599&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java Tue Nov  4 15:10:32 2014
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.Statistic;
+
+public class SparkStatistic {
+  private final String name;
+  private final String value;
+
+  SparkStatistic(String name, String value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public String getName() {
+    return name;
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java?rev=1636599&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java Tue Nov  4 15:10:32 2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.Statistic;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class SparkStatisticGroup {
+  private final String groupName;
+  private final List<SparkStatistic> statisticList;
+
+  SparkStatisticGroup(String groupName, List<SparkStatistic> statisticList) {
+    this.groupName = groupName;
+    this.statisticList = Collections.unmodifiableList(statisticList);
+  }
+
+  public String getGroupName() {
+    return groupName;
+  }
+
+  public Iterator<SparkStatistic> getStatistics() {
+    return this.statisticList.iterator();
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java?rev=1636599&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java Tue Nov  4 15:10:32 2014
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.Statistic;
+
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class SparkStatistics {
+  private final List<SparkStatisticGroup> statisticGroups;
+
+  SparkStatistics(List<SparkStatisticGroup> statisticGroups) {
+    this.statisticGroups = Collections.unmodifiableList(statisticGroups);
+  }
+
+  public Iterator<SparkStatisticGroup> getStatisticGroups() {
+    return this.statisticGroups.iterator();
+  }
+}
\ No newline at end of file

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java?rev=1636599&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java Tue Nov  4 15:10:32 2014
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.Statistic;
+
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounter;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounterGroup;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class SparkStatisticsBuilder {
+
+  private Map<String, List<SparkStatistic>> statisticMap;
+
+  public SparkStatisticsBuilder() {
+    statisticMap = new HashMap<String, List<SparkStatistic>>();
+  }
+
+  public SparkStatistics build() {
+    List<SparkStatisticGroup> statisticGroups = new LinkedList<SparkStatisticGroup>();
+    for (Map.Entry<String, List<SparkStatistic>> entry : statisticMap.entrySet()) {
+      String groupName = entry.getKey();
+      List<SparkStatistic> statisitcList = entry.getValue();
+      statisticGroups.add(new SparkStatisticGroup(groupName, statisitcList));
+    }
+
+    return new SparkStatistics(statisticGroups);
+  }
+
+  public SparkStatisticsBuilder add(SparkCounters sparkCounters) {
+    for (SparkCounterGroup counterGroup : sparkCounters.getSparkCounterGroups().values()) {
+      String groupDisplayName = counterGroup.getGroupDisplayName();
+      List<SparkStatistic> statisticList = statisticMap.get(groupDisplayName);
+      if (statisticList == null) {
+        statisticList = new LinkedList<SparkStatistic>();
+        statisticMap.put(groupDisplayName, statisticList);
+      }
+      for (SparkCounter counter : counterGroup.getSparkCounters().values()) {
+        String displayName = counter.getDisplayName();
+        statisticList.add(new SparkStatistic(displayName, Long.toString(counter.getValue())));
+      }
+    }
+    return this;
+  }
+
+  public SparkStatisticsBuilder add(String groupName, String name, String value) {
+    List<SparkStatistic> statisticList = statisticMap.get(groupName);
+    if (statisticList == null) {
+      statisticList = new LinkedList<SparkStatistic>();
+      statisticMap.put(groupName, statisticList);
+    }
+    statisticList.add(new SparkStatistic(name, value));
+    return this;
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1636599&r1=1636598&r2=1636599&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java Tue Nov  4 15:10:32 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.status;
 
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
 
 import java.util.Map;
@@ -36,4 +37,6 @@ public interface SparkJobStatus {
 
   public SparkCounters getCounter();
 
+  public SparkStatistics getSparkStatistics();
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1636599&r1=1636598&r2=1636599&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java Tue Nov  4 15:10:32 2014
@@ -22,6 +22,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
 import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
@@ -120,6 +122,11 @@ public class SimpleSparkJobStatus implem
     return sparkCounters;
   }
 
+  @Override
+  public SparkStatistics getSparkStatistics() {
+    return new SparkStatisticsBuilder().add(sparkCounters).build();
+  }
+
   private List<StageInfo> getStageInfo(int stageId) {
     List<StageInfo> stageInfos = new LinkedList<StageInfo>();