You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/10/16 19:42:52 UTC

svn commit: r1632396 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter: ./ SparkCounter.java SparkCounterGroup.java SparkCounters.java

Author: szehon
Date: Thu Oct 16 17:42:52 2014
New Revision: 1632396

URL: http://svn.apache.org/r1632396
Log:
HIVE-8456 : Support Hive Counter to collect spark job metric[Spark Branch] (Chengxiang Li, reviewed by Rui Li)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java?rev=1632396&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java Thu Oct 16 17:42:52 2014
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.counter;
+
+import java.io.Serializable;
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.AccumulatorParam;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class SparkCounter implements Serializable {
+
+  private String name;
+  private String displayName;
+  private Accumulator<Long> accumulator;
+
+  public SparkCounter(
+    String name,
+    String displayName,
+    String groupName,
+    long initValue,
+    JavaSparkContext sparkContext) {
+
+    this.name = name;
+    this.displayName = displayName;
+    LongAccumulatorParam longParam = new LongAccumulatorParam();
+    String accumulatorName = groupName + "_" + name;
+    this.accumulator = sparkContext.accumulator(initValue, accumulatorName, longParam);
+  }
+
+  public long getValue() {
+    return accumulator.value();
+  }
+
+  public void increment(long incr) {
+    accumulator.add(incr);
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getDisplayName() {
+    return displayName;
+  }
+
+  public void setDisplayName(String displayName) {
+    this.displayName = displayName;
+  }
+
+  class LongAccumulatorParam implements AccumulatorParam<Long> {
+
+    @Override
+    public Long addAccumulator(Long t1, Long t2) {
+      return t1 + t2;
+    }
+
+    @Override
+    public Long addInPlace(Long r1, Long r2) {
+      return r1 + r2;
+    }
+
+    @Override
+    public Long zero(Long initialValue) {
+      return 0L;
+    }
+  }
+
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java?rev=1632396&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java Thu Oct 16 17:42:52 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.counter;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * We use group to fold all the same kind of counters.
+ */
+public class SparkCounterGroup implements Serializable {
+
+  private String groupName;
+  private String groupDisplayName;
+  private Map<String, SparkCounter> sparkCounters;
+
+  private transient JavaSparkContext javaSparkContext;
+
+  public SparkCounterGroup(
+    String groupName,
+    String groupDisplayName,
+    JavaSparkContext javaSparkContext) {
+
+    this.groupName = groupName;
+    this.groupDisplayName = groupDisplayName;
+    this.javaSparkContext = javaSparkContext;
+    sparkCounters = new HashMap<String, SparkCounter>();
+  }
+
+  public void createCounter(String name, long initValue) {
+    String displayName = ResourceBundles.getCounterGroupName(name, name);
+    SparkCounter counter = new SparkCounter(name, displayName, groupName, initValue, javaSparkContext);
+    sparkCounters.put(name, counter);
+  }
+
+  public SparkCounter getCounter(String name) {
+    return sparkCounters.get(name);
+  }
+
+  public String getGroupName() {
+    return groupName;
+  }
+
+  public String getGroupDisplayName() {
+    return groupDisplayName;
+  }
+
+  public void setGroupDisplayName(String groupDisplayName) {
+    this.groupDisplayName = groupDisplayName;
+  }
+
+  public Map<String, SparkCounter> getSparkCounters() {
+    return sparkCounters;
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java?rev=1632396&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java Thu Oct 16 17:42:52 2014
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.counter;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * SparkCounters is used to collect Hive operator metric through Spark accumulator. There are few
+ * limitation of Spark accumulator, like:
+ * 1. accumulator should be created at Spark context side.
+ * 2. Spark tasks can only increment metric count.
+ * 3. User can only get accumulator value at Spark context side.
+ * These Spark Counter API is designed to fit into Hive requirement, while with several access
+ * restriction due to Spark accumulator previous mentioned:
+ * 1. Counter should be created on driver side if it would be accessed in task.
+ * 2. increment could only be invoked task side.
+ * 3. Hive could only get Counter value at driver side.
+ */
+public class SparkCounters implements Serializable {
+
+  private Map<String, SparkCounterGroup> sparkCounterGroups;
+
+  private transient JavaSparkContext javaSparkContext;
+  private transient Configuration hiveConf;
+
+  public SparkCounters(JavaSparkContext javaSparkContext, Configuration hiveConf) {
+    this.javaSparkContext = javaSparkContext;
+    this.hiveConf = hiveConf;
+    sparkCounterGroups = new HashMap<String, SparkCounterGroup>();
+    initializeSparkCounters();
+  }
+
+  /**
+   * pre-define all needed Counters here.
+   */
+  private void initializeSparkCounters() {
+    createCounter(HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVECOUNTERGROUP),
+      Operator.HIVECOUNTERCREATEDFILES);
+    createCounter(MapOperator.Counter.DESERIALIZE_ERRORS);
+  }
+
+  public void createCounter(Enum<?> key) {
+    createCounter(key.getDeclaringClass().getName(), key.name());
+  }
+
+  public void createCounter(String groupName, String counterName) {
+    createCounter(groupName, counterName, 0L);
+  }
+
+  public void createCounter(String groupName, String counterName, long initValue) {
+    getGroup(groupName).createCounter(counterName, initValue);
+  }
+
+  public void increment(Enum<?> key, long incrValue) {
+    increment(key.getDeclaringClass().getName(), key.name(), incrValue);
+  }
+
+  public void increment(String groupName, String counterName, long value) {
+    SparkCounter counter = getGroup(groupName).getCounter(counterName);
+    if (counter == null) {
+      throw new RuntimeException(
+        String.format("counter[%s, %s] has not initialized before.", groupName, counterName));
+    }
+    counter.increment(value);
+  }
+
+  public long getValue(String groupName, String counterName) {
+    SparkCounter counter = getGroup(groupName).getCounter(counterName);
+    if (counter == null) {
+      throw new RuntimeException(
+        String.format("counter[%s, %s] has not initialized before.", groupName, counterName));
+    }
+
+    return counter.getValue();
+  }
+
+  public SparkCounter getCounter(String groupName, String counterName) {
+    return getGroup(groupName).getCounter(counterName);
+  }
+
+  public SparkCounter getCounter(Enum<?> key) {
+    return getCounter(key.getDeclaringClass().getName(), key.name());
+  }
+
+  private SparkCounterGroup getGroup(String groupName) {
+    SparkCounterGroup group = sparkCounterGroups.get(groupName);
+    if (group == null) {
+      String groupDisplayName = ResourceBundles.getCounterGroupName(groupName, groupName);
+      group = new SparkCounterGroup(groupName, groupDisplayName, javaSparkContext);
+      sparkCounterGroups.put(groupName, group);
+    }
+    return group;
+  }
+
+  public Map<String, SparkCounterGroup> getSparkCounterGroups() {
+    return sparkCounterGroups;
+  }
+}