You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:32:33 UTC

[46/51] [partial] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
deleted file mode 100644
index 718791c..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.jsonexplain.tez;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.hadoop.hive.common.jsonexplain.tez.Vertex.VertexType;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public final class Op {
-  public final String name;
-  // tezJsonParser
-  public final TezJsonParser parser;
-  public final String operatorId;
-  public Op parent;
-  public final List<Op> children;
-  public final Map<String, String> attrs;
-  // the jsonObject for this operator
-  public final JSONObject opObject;
-  // the vertex that this operator belongs to
-  public final Vertex vertex;
-  // the vertex that this operator output to
-  public final String outputVertexName;
-  // the Operator type
-  public final OpType type;
-
-  public enum OpType {
-    MAPJOIN, MERGEJOIN, RS, OTHERS
-  };
-
-  public Op(String name, String id, String outputVertexName, List<Op> children,
-      Map<String, String> attrs, JSONObject opObject, Vertex vertex, TezJsonParser tezJsonParser)
-      throws JSONException {
-    super();
-    this.name = name;
-    this.operatorId = id;
-    this.type = deriveOpType(operatorId);
-    this.outputVertexName = outputVertexName;
-    this.children = children;
-    this.attrs = attrs;
-    this.opObject = opObject;
-    this.vertex = vertex;
-    this.parser = tezJsonParser;
-  }
-
-  private OpType deriveOpType(String operatorId) {
-    if (operatorId != null) {
-      if (operatorId.startsWith(OpType.MAPJOIN.toString())) {
-        return OpType.MAPJOIN;
-      } else if (operatorId.startsWith(OpType.MERGEJOIN.toString())) {
-        return OpType.MERGEJOIN;
-      } else if (operatorId.startsWith(OpType.RS.toString())) {
-        return OpType.RS;
-      } else {
-        return OpType.OTHERS;
-      }
-    } else {
-      return OpType.OTHERS;
-    }
-  }
-
-  private void inlineJoinOp() throws Exception {
-    // inline map join operator
-    if (this.type == OpType.MAPJOIN) {
-      JSONObject joinObj = opObject.getJSONObject(this.name);
-      // get the map for posToVertex
-      JSONObject verticeObj = joinObj.getJSONObject("input vertices:");
-      Map<String, Vertex> posToVertex = new LinkedHashMap<>();
-      for (String pos : JSONObject.getNames(verticeObj)) {
-        String vertexName = verticeObj.getString(pos);
-        // update the connection
-        Connection c = null;
-        for (Connection connection : vertex.parentConnections) {
-          if (connection.from.name.equals(vertexName)) {
-            posToVertex.put(pos, connection.from);
-            c = connection;
-            break;
-          }
-        }
-        if (c != null) {
-          parser.addInline(this, c);
-        }
-      }
-      // update the attrs
-      this.attrs.remove("input vertices:");
-      // update the keys to use operator name
-      JSONObject keys = joinObj.getJSONObject("keys:");
-      // find out the vertex for the big table
-      Set<Vertex> parentVertexes = new HashSet<>();
-      for (Connection connection : vertex.parentConnections) {
-        parentVertexes.add(connection.from);
-      }
-      parentVertexes.removeAll(posToVertex.values());
-      Map<String, String> posToOpId = new LinkedHashMap<>();
-      if (keys.length() != 0) {
-        for (String key : JSONObject.getNames(keys)) {
-          // first search from the posToVertex
-          if (posToVertex.containsKey(key)) {
-            Vertex vertex = posToVertex.get(key);
-            if (vertex.rootOps.size() == 1) {
-              posToOpId.put(key, vertex.rootOps.get(0).operatorId);
-            } else if ((vertex.rootOps.size() == 0 && vertex.vertexType == VertexType.UNION)) {
-              posToOpId.put(key, vertex.name);
-            } else {
-              Op singleRSOp = vertex.getSingleRSOp();
-              if (singleRSOp != null) {
-                posToOpId.put(key, singleRSOp.operatorId);
-              } else {
-                throw new Exception(
-                    "There are none or more than one root operators in a single vertex "
-                        + vertex.name
-                        + " when hive explain user is trying to identify the operator id.");
-              }
-            }
-          }
-          // then search from parent
-          else if (parent != null) {
-            posToOpId.put(key, parent.operatorId);
-          }
-          // then assume it is from its own vertex
-          else if (parentVertexes.size() == 1) {
-            Vertex vertex = parentVertexes.iterator().next();
-            parentVertexes.clear();
-            if (vertex.rootOps.size() == 1) {
-              posToOpId.put(key, vertex.rootOps.get(0).operatorId);
-            } else if ((vertex.rootOps.size() == 0 && vertex.vertexType == VertexType.UNION)) {
-              posToOpId.put(key, vertex.name);
-            } else {
-              Op singleRSOp = vertex.getSingleRSOp();
-              if (singleRSOp != null) {
-                posToOpId.put(key, singleRSOp.operatorId);
-              } else {
-                throw new Exception(
-                    "There are none or more than one root operators in a single vertex "
-                        + vertex.name
-                        + " when hive explain user is trying to identify the operator id.");
-              }
-            }
-          }
-          // finally throw an exception
-          else {
-            throw new Exception(
-                "Can not find the source operator on one of the branches of map join.");
-          }
-        }
-      }
-      this.attrs.remove("keys:");
-      StringBuffer sb = new StringBuffer();
-      JSONArray conditionMap = joinObj.getJSONArray("condition map:");
-      for (int index = 0; index < conditionMap.length(); index++) {
-        JSONObject cond = conditionMap.getJSONObject(index);
-        String k = (String) cond.keys().next();
-        JSONObject condObject = new JSONObject((String)cond.get(k));
-        String type = condObject.getString("type");
-        String left = condObject.getString("left");
-        String right = condObject.getString("right");
-        if (keys.length() != 0) {
-          sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "."
-              + keys.get(right) + "(" + type + "),");
-        } else {
-          // probably a cross product
-          sb.append("(" + type + "),");
-        }
-      }
-      this.attrs.remove("condition map:");
-      this.attrs.put("Conds:", sb.substring(0, sb.length() - 1));
-    }
-    // should be merge join
-    else {
-      Map<String, String> posToOpId = new LinkedHashMap<>();
-      if (vertex.mergeJoinDummyVertexs.size() == 0) {
-        if (vertex.tagToInput.size() != vertex.parentConnections.size()) {
-          throw new Exception("tagToInput size " + vertex.tagToInput.size()
-              + " is different from parentConnections size " + vertex.parentConnections.size());
-        }
-        for (Entry<String, String> entry : vertex.tagToInput.entrySet()) {
-          Connection c = null;
-          for (Connection connection : vertex.parentConnections) {
-            if (connection.from.name.equals(entry.getValue())) {
-              Vertex v = connection.from;
-              if (v.rootOps.size() == 1) {
-                posToOpId.put(entry.getKey(), v.rootOps.get(0).operatorId);
-              } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
-                posToOpId.put(entry.getKey(), v.name);
-              } else {
-                Op singleRSOp = v.getSingleRSOp();
-                if (singleRSOp != null) {
-                  posToOpId.put(entry.getKey(), singleRSOp.operatorId);
-                } else {
-                  throw new Exception(
-                      "There are none or more than one root operators in a single vertex " + v.name
-                          + " when hive explain user is trying to identify the operator id.");
-                }
-              }
-              c = connection;
-              break;
-            }
-          }
-          if (c == null) {
-            throw new Exception("Can not find " + entry.getValue()
-                + " while parsing keys of merge join operator");
-          }
-        }
-      } else {
-        posToOpId.put(vertex.tag, this.parent.operatorId);
-        for (Vertex v : vertex.mergeJoinDummyVertexs) {
-          if (v.rootOps.size() != 1) {
-            throw new Exception("Can not find a single root operators in a single vertex " + v.name
-                + " when hive explain user is trying to identify the operator id.");
-          }
-          posToOpId.put(v.tag, v.rootOps.get(0).operatorId);
-        }
-      }
-      JSONObject joinObj = opObject.getJSONObject(this.name);
-      // update the keys to use operator name
-      JSONObject keys = joinObj.getJSONObject("keys:");
-      if (keys.length() != 0) {
-        for (String key : JSONObject.getNames(keys)) {
-          if (!posToOpId.containsKey(key)) {
-            throw new Exception(
-                "Can not find the source operator on one of the branches of merge join.");
-          }
-        }
-        // inline merge join operator in a self-join
-        if (this.vertex != null) {
-          for (Vertex v : this.vertex.mergeJoinDummyVertexs) {
-            parser.addInline(this, new Connection(null, v));
-          }
-        }
-      }
-      // update the attrs
-      this.attrs.remove("keys:");
-      StringBuffer sb = new StringBuffer();
-      JSONArray conditionMap = joinObj.getJSONArray("condition map:");
-      for (int index = 0; index < conditionMap.length(); index++) {
-        JSONObject cond = conditionMap.getJSONObject(index);
-        String k = (String) cond.keys().next();
-        JSONObject condObject = new JSONObject((String)cond.get(k));
-        String type = condObject.getString("type");
-        String left = condObject.getString("left");
-        String right = condObject.getString("right");
-        if (keys.length() != 0) {
-          sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "."
-              + keys.get(right) + "(" + type + "),");
-        } else {
-          // probably a cross product
-          sb.append("(" + type + "),");
-        }
-      }
-      this.attrs.remove("condition map:");
-      this.attrs.put("Conds:", sb.substring(0, sb.length() - 1));
-    }
-  }
-
-  private String getNameWithOpIdStats() {
-    StringBuffer sb = new StringBuffer();
-    sb.append(TezJsonParserUtils.renameReduceOutputOperator(name, vertex));
-    if (operatorId != null) {
-      sb.append(" [" + operatorId + "]");
-    }
-    if (!TezJsonParserUtils.OperatorNoStats.contains(name) && attrs.containsKey("Statistics:")) {
-      sb.append(" (" + attrs.get("Statistics:") + ")");
-    }
-    attrs.remove("Statistics:");
-    return sb.toString();
-  }
-
-  /**
-   * @param printer
-   * @param indentFlag
-   * @param branchOfJoinOp
-   *          This parameter is used to show if it is a branch of a Join
-   *          operator so that we can decide the corresponding indent.
-   * @throws Exception
-   */
-  public void print(Printer printer, int indentFlag, boolean branchOfJoinOp) throws Exception {
-    // print name
-    if (parser.printSet.contains(this)) {
-      printer.println(TezJsonParser.prefixString(indentFlag) + " Please refer to the previous "
-          + this.getNameWithOpIdStats());
-      return;
-    }
-    parser.printSet.add(this);
-    if (!branchOfJoinOp) {
-      printer.println(TezJsonParser.prefixString(indentFlag) + this.getNameWithOpIdStats());
-    } else {
-      printer.println(TezJsonParser.prefixString(indentFlag, "<-") + this.getNameWithOpIdStats());
-    }
-    branchOfJoinOp = false;
-    // if this operator is a Map Join Operator or a Merge Join Operator
-    if (this.type == OpType.MAPJOIN || this.type == OpType.MERGEJOIN) {
-      inlineJoinOp();
-      branchOfJoinOp = true;
-    }
-    // if this operator is the last operator, we summarize the non-inlined
-    // vertex
-    List<Connection> noninlined = new ArrayList<>();
-    if (this.parent == null) {
-      if (this.vertex != null) {
-        for (Connection connection : this.vertex.parentConnections) {
-          if (!parser.isInline(connection.from)) {
-            noninlined.add(connection);
-          }
-        }
-      }
-    }
-    // print attr
-    indentFlag++;
-    if (!attrs.isEmpty()) {
-      printer.println(TezJsonParser.prefixString(indentFlag)
-          + TezJsonParserUtils.attrsToString(attrs));
-    }
-    // print inline vertex
-    if (parser.inlineMap.containsKey(this)) {
-      for (int index = 0; index < parser.inlineMap.get(this).size(); index++) {
-        Connection connection = parser.inlineMap.get(this).get(index);
-        connection.from.print(printer, indentFlag, connection.type, this.vertex);
-      }
-    }
-    // print parent op, i.e., where data comes from
-    if (this.parent != null) {
-      this.parent.print(printer, indentFlag, branchOfJoinOp);
-    }
-    // print next vertex
-    else {
-      for (int index = 0; index < noninlined.size(); index++) {
-        Vertex v = noninlined.get(index).from;
-        v.print(printer, indentFlag, noninlined.get(index).type, this.vertex);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java
deleted file mode 100644
index d3c91d6..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.jsonexplain.tez;
-
-public final class Printer {
-  public static final String lineSeparator = System.getProperty("line.separator");;
-  private final StringBuilder builder = new StringBuilder();
-
-  public void print(String string) {
-    builder.append(string);
-  }
-
-  public void println(String string) {
-    builder.append(string);
-    builder.append(lineSeparator);
-  }
-
-  public void println() {
-    builder.append(lineSeparator);
-  }
-  
-  public String toString() {
-    return builder.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java
deleted file mode 100644
index 63937f8..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.jsonexplain.tez;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.jsonexplain.tez.Vertex.VertexType;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public final class Stage {
-  //external name is used to show at the console
-  String externalName;
-  //internal name is used to track the stages
-  public final String internalName;
-  //tezJsonParser
-  public final TezJsonParser parser;
-  // upstream stages, e.g., root stage
-  public final List<Stage> parentStages = new ArrayList<>();
-  // downstream stages.
-  public final List<Stage> childStages = new ArrayList<>();
-  public final Map<String, Vertex> vertexs =new LinkedHashMap<>();
-  public final Map<String, String> attrs = new TreeMap<>();
-  Map<Vertex, List<Connection>> tezStageDependency;
-  // some stage may contain only a single operator, e.g., create table operator,
-  // fetch operator.
-  Op op;
-
-  public Stage(String name, TezJsonParser tezJsonParser) {
-    super();
-    internalName = name;
-    externalName = name;
-    parser = tezJsonParser;
-  }
-
-  public void addDependency(JSONObject object, Map<String, Stage> stages) throws JSONException {
-    if (object.has("DEPENDENT STAGES")) {
-      String names = object.getString("DEPENDENT STAGES");
-      for (String name : names.split(",")) {
-        Stage parent = stages.get(name.trim());
-        this.parentStages.add(parent);
-        parent.childStages.add(this);
-      }
-    }
-    if (object.has("CONDITIONAL CHILD TASKS")) {
-      String names = object.getString("CONDITIONAL CHILD TASKS");
-      this.externalName = this.internalName + "(CONDITIONAL CHILD TASKS: " + names + ")";
-      for (String name : names.split(",")) {
-        Stage child = stages.get(name.trim());
-        child.externalName = child.internalName + "(CONDITIONAL)";
-        child.parentStages.add(this);
-        this.childStages.add(child);
-      }
-    }
-  }
-
-  /**
-   * @param object
-   * @throws Exception
-   *           If the object of stage contains "Tez", we need to extract the
-   *           vertices and edges Else we need to directly extract operators
-   *           and/or attributes.
-   */
-  public void extractVertex(JSONObject object) throws Exception {
-    if (object.has("Tez")) {
-      this.tezStageDependency = new TreeMap<>();
-      JSONObject tez = (JSONObject) object.get("Tez");
-      JSONObject vertices = tez.getJSONObject("Vertices:");
-      if (tez.has("Edges:")) {
-        JSONObject edges = tez.getJSONObject("Edges:");
-        // iterate for the first time to get all the vertices
-        for (String to : JSONObject.getNames(edges)) {
-          vertexs.put(to, new Vertex(to, vertices.getJSONObject(to), parser));
-        }
-        // iterate for the second time to get all the vertex dependency
-        for (String to : JSONObject.getNames(edges)) {
-          Object o = edges.get(to);
-          Vertex v = vertexs.get(to);
-          // 1 to 1 mapping
-          if (o instanceof JSONObject) {
-            JSONObject obj = (JSONObject) o;
-            String parent = obj.getString("parent");
-            Vertex parentVertex = vertexs.get(parent);
-            if (parentVertex == null) {
-              parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser);
-              vertexs.put(parent, parentVertex);
-            }
-            String type = obj.getString("type");
-            // for union vertex, we reverse the dependency relationship
-            if (!"CONTAINS".equals(type)) {
-              v.addDependency(new Connection(type, parentVertex));
-              parentVertex.setType(type);
-              parentVertex.children.add(v);
-            } else {
-              parentVertex.addDependency(new Connection(type, v));
-              v.children.add(parentVertex);
-            }
-            this.tezStageDependency.put(v, Arrays.asList(new Connection(type, parentVertex)));
-          } else {
-            // 1 to many mapping
-            JSONArray from = (JSONArray) o;
-            List<Connection> list = new ArrayList<>();
-            for (int index = 0; index < from.length(); index++) {
-              JSONObject obj = from.getJSONObject(index);
-              String parent = obj.getString("parent");
-              Vertex parentVertex = vertexs.get(parent);
-              if (parentVertex == null) {
-                parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser);
-                vertexs.put(parent, parentVertex);
-              }
-              String type = obj.getString("type");
-              if (!"CONTAINS".equals(type)) {
-                v.addDependency(new Connection(type, parentVertex));
-                parentVertex.setType(type);
-                parentVertex.children.add(v);
-              } else {
-                parentVertex.addDependency(new Connection(type, v));
-                v.children.add(parentVertex);
-              }
-              list.add(new Connection(type, parentVertex));
-            }
-            this.tezStageDependency.put(v, list);
-          }
-        }
-      } else {
-        for (String vertexName : JSONObject.getNames(vertices)) {
-          vertexs.put(vertexName, new Vertex(vertexName, vertices.getJSONObject(vertexName), parser));
-        }
-      }
-      // The opTree in vertex is extracted
-      for (Vertex v : vertexs.values()) {
-        if (v.vertexType == VertexType.MAP || v.vertexType == VertexType.REDUCE) {
-          v.extractOpTree();
-          v.checkMultiReduceOperator();
-        }
-      }
-    } else {
-      String[] names = JSONObject.getNames(object);
-      if (names != null) {
-        for (String name : names) {
-          if (name.contains("Operator")) {
-            this.op = extractOp(name, object.getJSONObject(name));
-          } else {
-            if (!object.get(name).toString().isEmpty()) {
-              attrs.put(name, object.get(name).toString());
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * @param opName
-   * @param opObj
-   * @return
-   * @throws Exception
-   *           This method address the create table operator, fetch operator,
-   *           etc
-   */
-  Op extractOp(String opName, JSONObject opObj) throws Exception {
-    Map<String, String> attrs = new TreeMap<>();
-    Vertex v = null;
-    if (opObj.length() > 0) {
-      String[] names = JSONObject.getNames(opObj);
-      for (String name : names) {
-        Object o = opObj.get(name);
-        if (isPrintable(o) && !o.toString().isEmpty()) {
-          attrs.put(name, o.toString());
-        } else if (o instanceof JSONObject) {
-          JSONObject attrObj = (JSONObject) o;
-          if (attrObj.length() > 0) {
-            if (name.equals("Processor Tree:")) {
-              JSONObject object = new JSONObject(new LinkedHashMap<>());
-              object.put(name, attrObj);
-              v = new Vertex(null, object, parser);
-              v.extractOpTree();
-            } else {
-              for (String attrName : JSONObject.getNames(attrObj)) {
-                if (!attrObj.get(attrName).toString().isEmpty()) {
-                  attrs.put(attrName, attrObj.get(attrName).toString());
-                }
-              }
-            }
-          }
-        } else {
-          throw new Exception("Unsupported object in " + this.internalName);
-        }
-      }
-    }
-    Op op = new Op(opName, null, null, null, attrs, null, v, parser);
-    if (v != null) {
-      parser.addInline(op, new Connection(null, v));
-    }
-    return op;
-  }
-
-  private boolean isPrintable(Object val) {
-    if (val instanceof Boolean || val instanceof String || val instanceof Integer
-        || val instanceof Long || val instanceof Byte || val instanceof Float
-        || val instanceof Double || val instanceof Path) {
-      return true;
-    }
-    if (val != null && val.getClass().isPrimitive()) {
-      return true;
-    }
-    return false;
-  }
-
-  public void print(Printer printer, int indentFlag) throws Exception {
-    // print stagename
-    if (parser.printSet.contains(this)) {
-      printer.println(TezJsonParser.prefixString(indentFlag) + " Please refer to the previous "
-          + externalName);
-      return;
-    }
-    parser.printSet.add(this);
-    printer.println(TezJsonParser.prefixString(indentFlag) + externalName);
-    // print vertexes
-    indentFlag++;
-    for (Vertex candidate : this.vertexs.values()) {
-      if (!parser.isInline(candidate) && candidate.children.isEmpty()) {
-        candidate.print(printer, indentFlag, null, null);
-      }
-    }
-    if (!attrs.isEmpty()) {
-      printer.println(TezJsonParser.prefixString(indentFlag)
-          + TezJsonParserUtils.attrsToString(attrs));
-    }
-    if (op != null) {
-      op.print(printer, indentFlag, false);
-    }
-    indentFlag++;
-    // print dependent stages
-    for (Stage stage : this.parentStages) {
-      stage.print(printer, indentFlag);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
index ea86048..294dc6b 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParser.java
@@ -18,146 +18,29 @@
 
 package org.apache.hadoop.hive.common.jsonexplain.tez;
 
-import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import org.apache.hadoop.hive.common.jsonexplain.DagJsonParser;
 
-import org.apache.hadoop.hive.common.jsonexplain.JsonParser;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public final class TezJsonParser implements JsonParser {
-  public final Map<String, Stage> stages = new LinkedHashMap<>();
-  protected final Logger LOG;
-  // the objects that have been printed.
-  public final Set<Object> printSet = new LinkedHashSet<>();
-  // the vertex that should be inlined. <Operator, list of Vertex that is
-  // inlined>
-  public final Map<Op, List<Connection>> inlineMap = new LinkedHashMap<>();
-
-  public TezJsonParser() {
-    super();
-    LOG = LoggerFactory.getLogger(this.getClass().getName());
-  }
-
-  public void extractStagesAndPlans(JSONObject inputObject) throws Exception {
-    // extract stages
-    JSONObject dependency = inputObject.getJSONObject("STAGE DEPENDENCIES");
-    if (dependency != null && dependency.length() > 0) {
-      // iterate for the first time to get all the names of stages.
-      for (String stageName : JSONObject.getNames(dependency)) {
-        this.stages.put(stageName, new Stage(stageName, this));
-      }
-      // iterate for the second time to get all the dependency.
-      for (String stageName : JSONObject.getNames(dependency)) {
-        JSONObject dependentStageNames = dependency.getJSONObject(stageName);
-        this.stages.get(stageName).addDependency(dependentStageNames, this.stages);
-      }
-    }
-    // extract stage plans
-    JSONObject stagePlans = inputObject.getJSONObject("STAGE PLANS");
-    if (stagePlans != null && stagePlans.length() > 0) {
-      for (String stageName : JSONObject.getNames(stagePlans)) {
-        JSONObject stagePlan = stagePlans.getJSONObject(stageName);
-        this.stages.get(stageName).extractVertex(stagePlan);
-      }
-    }
-  }
-
-  /**
-   * @param indentFlag
-   *          help to generate correct indent
-   * @return
-   */
-  public static String prefixString(int indentFlag) {
-    StringBuilder sb = new StringBuilder();
-    for (int index = 0; index < indentFlag; index++) {
-      sb.append("  ");
-    }
-    return sb.toString();
-  }
-
-  /**
-   * @param indentFlag
-   * @param tail
-   *          help to generate correct indent with a specific tail
-   * @return
-   */
-  public static String prefixString(int indentFlag, String tail) {
-    StringBuilder sb = new StringBuilder();
-    for (int index = 0; index < indentFlag; index++) {
-      sb.append("  ");
-    }
-    int len = sb.length();
-    return sb.replace(len - tail.length(), len, tail).toString();
-  }
+public class TezJsonParser extends DagJsonParser {
 
   @Override
-  public void print(JSONObject inputObject, PrintStream outputStream) throws Exception {
-    LOG.info("JsonParser is parsing:" + inputObject.toString());
-    this.extractStagesAndPlans(inputObject);
-    Printer printer = new Printer();
-    // print out the cbo info
-    if (inputObject.has("cboInfo")) {
-      printer.println(inputObject.getString("cboInfo"));
-      printer.println();
-    }
-    // print out the vertex dependency in root stage
-    for (Stage candidate : this.stages.values()) {
-      if (candidate.tezStageDependency != null && candidate.tezStageDependency.size() > 0) {
-        printer.println("Vertex dependency in root stage");
-        for (Entry<Vertex, List<Connection>> entry : candidate.tezStageDependency.entrySet()) {
-          StringBuilder sb = new StringBuilder();
-          sb.append(entry.getKey().name);
-          sb.append(" <- ");
-          boolean printcomma = false;
-          for (Connection connection : entry.getValue()) {
-            if (printcomma) {
-              sb.append(", ");
-            } else {
-              printcomma = true;
-            }
-            sb.append(connection.from.name + " (" + connection.type + ")");
-          }
-          printer.println(sb.toString());
-        }
-        printer.println();
-      }
+  public String mapEdgeType(String edgeName) {
+    switch (edgeName) {
+      case "BROADCAST_EDGE":
+        return "BROADCAST";
+      case "SIMPLE_EDGE":
+        return "SHUFFLE";
+      case "CUSTOM_SIMPLE_EDGE":
+        return "PARTITION_ONLY_SHUFFLE";
+      case "CUSTOM_EDGE":
+        return "MULTICAST";
+      default:
+        return "UNKNOWN";
     }
-    // print out all the stages that have no childStages.
-    for (Stage candidate : this.stages.values()) {
-      if (candidate.childStages.isEmpty()) {
-        candidate.print(printer, 0);
-      }
-    }
-    outputStream.println(printer.toString());
   }
 
-  public void addInline(Op op, Connection connection) {
-    List<Connection> list = inlineMap.get(op);
-    if (list == null) {
-      list = new ArrayList<>();
-      list.add(connection);
-      inlineMap.put(op, list);
-    } else {
-      list.add(connection);
-    }
-  }
-
-  public boolean isInline(Vertex v) {
-    for (List<Connection> list : inlineMap.values()) {
-      for (Connection connection : list) {
-        if (connection.from.equals(v)) {
-          return true;
-        }
-      }
-    }
-    return false;
+  @Override
+  public String getFrameworkName() {
+    return "Tez";
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParserUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParserUtils.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParserUtils.java
deleted file mode 100644
index 363a422..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/TezJsonParserUtils.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.jsonexplain.tez;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-
-public class TezJsonParserUtils {
-
-  public static List<String> OperatorNoStats = Arrays.asList(new String[] { "File Output Operator",
-      "Reduce Output Operator" });
-
-  public static String renameReduceOutputOperator(String operatorName, Vertex vertex) {
-    if (operatorName.equals("Reduce Output Operator") && vertex.edgeType != null) {
-      return vertex.edgeType.name();
-    } else {
-      return operatorName;
-    }
-  }
-
-  public static String attrsToString(Map<String, String> attrs) {
-    StringBuffer sb = new StringBuffer();
-    boolean first = true;
-    for (Entry<String, String> entry : attrs.entrySet()) {
-      if (first) {
-        first = false;
-      } else {
-        sb.append(",");
-      }
-      sb.append(entry.getKey() + entry.getValue());
-    }
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java
deleted file mode 100644
index 3d559bd..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.jsonexplain.tez;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hive.common.jsonexplain.tez.Op.OpType;
-import org.apache.hadoop.util.hash.Hash;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public final class Vertex implements Comparable<Vertex>{
-  public final String name;
-  //tezJsonParser
-  public final TezJsonParser parser;
-  // vertex's parent connections.
-  public final List<Connection> parentConnections = new ArrayList<>();
-  // vertex's children vertex.
-  public final List<Vertex> children = new ArrayList<>();
-  // the jsonObject for this vertex
-  public final JSONObject vertexObject;
-  // whether this vertex is dummy (which does not really exists but is created),
-  // e.g., a dummy vertex for a mergejoin branch
-  public boolean dummy;
-  // the rootOps in this vertex
-  public final List<Op> rootOps = new ArrayList<>();
-  // we create a dummy vertex for a mergejoin branch for a self join if this
-  // vertex is a mergejoin
-  public final List<Vertex> mergeJoinDummyVertexs = new ArrayList<>();
-  // whether this vertex has multiple reduce operators
-  public boolean hasMultiReduceOp = false;
-  // execution mode
-  public String executionMode = "";
-  // tagToInput for reduce work
-  public Map<String, String> tagToInput = new LinkedHashMap<>();
-  // tag
-  public String tag;
-
-  public static enum VertexType {
-    MAP, REDUCE, UNION, UNKNOWN
-  };
-  public VertexType vertexType;
-
-  public static enum EdgeType {
-    BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, UNKNOWN
-  };
-  public EdgeType edgeType;
-
-  public Vertex(String name, JSONObject vertexObject, TezJsonParser tezJsonParser) {
-    super();
-    this.name = name;
-    if (this.name != null) {
-      if (this.name.contains("Map")) {
-        this.vertexType = VertexType.MAP;
-      } else if (this.name.contains("Reduce")) {
-        this.vertexType = VertexType.REDUCE;
-      } else if (this.name.contains("Union")) {
-        this.vertexType = VertexType.UNION;
-      } else {
-        this.vertexType = VertexType.UNKNOWN;
-      }
-    } else {
-      this.vertexType = VertexType.UNKNOWN;
-    }
-    this.dummy = false;
-    this.vertexObject = vertexObject;
-    parser = tezJsonParser;
-  }
-
-  public void addDependency(Connection connection) throws JSONException {
-    this.parentConnections.add(connection);
-  }
-
-  /**
-   * @throws JSONException
-   * @throws JsonParseException
-   * @throws JsonMappingException
-   * @throws IOException
-   * @throws Exception
-   *           We assume that there is a single top-level Map Operator Tree or a
-   *           Reduce Operator Tree in a vertex
-   */
-  public void extractOpTree() throws JSONException, JsonParseException, JsonMappingException,
-      IOException, Exception {
-    if (vertexObject.length() != 0) {
-      for (String key : JSONObject.getNames(vertexObject)) {
-        if (key.equals("Map Operator Tree:")) {
-          extractOp(vertexObject.getJSONArray(key).getJSONObject(0));
-        } else if (key.equals("Reduce Operator Tree:") || key.equals("Processor Tree:")) {
-          extractOp(vertexObject.getJSONObject(key));
-        } else if (key.equals("Join:")) {
-          // this is the case when we have a map-side SMB join
-          // one input of the join is treated as a dummy vertex
-          JSONArray array = vertexObject.getJSONArray(key);
-          for (int index = 0; index < array.length(); index++) {
-            JSONObject mpOpTree = array.getJSONObject(index);
-            Vertex v = new Vertex(null, mpOpTree, parser);
-            v.extractOpTree();
-            v.dummy = true;
-            mergeJoinDummyVertexs.add(v);
-          }
-        } else if (key.equals("Merge File Operator")) {
-          JSONObject opTree = vertexObject.getJSONObject(key);
-          if (opTree.has("Map Operator Tree:")) {
-            extractOp(opTree.getJSONArray("Map Operator Tree:").getJSONObject(0));
-          } else {
-            throw new Exception("Merge File Operator does not have a Map Operator Tree");
-          }
-        } else if (key.equals("Execution mode:")) {
-          executionMode = " " + vertexObject.getString(key);
-        } else if (key.equals("tagToInput:")) {
-          JSONObject tagToInput = vertexObject.getJSONObject(key);
-          for (String tag : JSONObject.getNames(tagToInput)) {
-            this.tagToInput.put(tag, (String) tagToInput.get(tag));
-          }
-        } else if (key.equals("tag:")) {
-          this.tag = vertexObject.getString(key);
-        } else {
-          throw new Exception("Unsupported operator tree in vertex " + this.name);
-        }
-      }
-    }
-  }
-
-  /**
-   * @param operator
-   * @param parent
-   * @return
-   * @throws JSONException
-   * @throws JsonParseException
-   * @throws JsonMappingException
-   * @throws IOException
-   * @throws Exception
-   *           assumption: each operator only has one parent but may have many
-   *           children
-   */
-  Op extractOp(JSONObject operator) throws JSONException, JsonParseException, JsonMappingException,
-      IOException, Exception {
-    String[] names = JSONObject.getNames(operator);
-    if (names.length != 1) {
-      throw new Exception("Expect only one operator in " + operator.toString());
-    } else {
-      String opName = names[0];
-      JSONObject attrObj = (JSONObject) operator.get(opName);
-      Map<String, String> attrs = new TreeMap<>();
-      List<Op> children = new ArrayList<>();
-      String id = null;
-      String outputVertexName = null;
-      for (String attrName : JSONObject.getNames(attrObj)) {
-        if (attrName.equals("children")) {
-          Object childrenObj = attrObj.get(attrName);
-          if (childrenObj instanceof JSONObject) {
-            if (((JSONObject) childrenObj).length() != 0) {
-              children.add(extractOp((JSONObject) childrenObj));
-            }
-          } else if (childrenObj instanceof JSONArray) {
-            if (((JSONArray) childrenObj).length() != 0) {
-              JSONArray array = ((JSONArray) childrenObj);
-              for (int index = 0; index < array.length(); index++) {
-                children.add(extractOp(array.getJSONObject(index)));
-              }
-            }
-          } else {
-            throw new Exception("Unsupported operator " + this.name
-                + "'s children operator is neither a jsonobject nor a jsonarray");
-          }
-        } else {
-          if (attrName.equals("OperatorId:")) {
-            id = attrObj.get(attrName).toString();
-          } else if (attrName.equals("outputname:")) {
-            outputVertexName = attrObj.get(attrName).toString();
-          } else {
-            if (!attrObj.get(attrName).toString().isEmpty()) {
-              attrs.put(attrName, attrObj.get(attrName).toString());
-            }
-          }
-        }
-      }
-      Op op = new Op(opName, id, outputVertexName, children, attrs, operator, this, parser);
-      if (!children.isEmpty()) {
-        for (Op child : children) {
-          child.parent = op;
-        }
-      } else {
-        this.rootOps.add(op);
-      }
-      return op;
-    }
-  }
-
-  public void print(Printer printer, int indentFlag, String type, Vertex callingVertex)
-      throws JSONException, Exception {
-    // print vertexname
-    if (parser.printSet.contains(this) && !hasMultiReduceOp) {
-      if (type != null) {
-        printer.println(TezJsonParser.prefixString(indentFlag, "<-")
-            + " Please refer to the previous " + this.name + " [" + type + "]");
-      } else {
-        printer.println(TezJsonParser.prefixString(indentFlag, "<-")
-            + " Please refer to the previous " + this.name);
-      }
-      return;
-    }
-    parser.printSet.add(this);
-    if (type != null) {
-      printer.println(TezJsonParser.prefixString(indentFlag, "<-") + this.name + " [" + type + "]"
-          + this.executionMode);
-    } else if (this.name != null) {
-      printer.println(TezJsonParser.prefixString(indentFlag) + this.name + this.executionMode);
-    }
-    // print operators
-    if (hasMultiReduceOp && !(callingVertex.vertexType == VertexType.UNION)) {
-      // find the right op
-      Op choose = null;
-      for (Op op : this.rootOps) {
-        if (op.outputVertexName.equals(callingVertex.name)) {
-          choose = op;
-        }
-      }
-      if (choose != null) {
-        choose.print(printer, indentFlag, false);
-      } else {
-        throw new Exception("Can not find the right reduce output operator for vertex " + this.name);
-      }
-    } else {
-      for (Op op : this.rootOps) {
-        // dummy vertex is treated as a branch of a join operator
-        if (this.dummy) {
-          op.print(printer, indentFlag, true);
-        } else {
-          op.print(printer, indentFlag, false);
-        }
-      }
-    }
-    if (vertexType == VertexType.UNION) {
-      // print dependent vertexs
-      indentFlag++;
-      for (int index = 0; index < this.parentConnections.size(); index++) {
-        Connection connection = this.parentConnections.get(index);
-        connection.from.print(printer, indentFlag, connection.type, this);
-      }
-    }
-  }
-
-  /**
-   * We check if a vertex has multiple reduce operators.
-   */
-  public void checkMultiReduceOperator() {
-    // check if it is a reduce vertex and its children is more than 1;
-    if (!this.name.contains("Reduce") || this.rootOps.size() < 2) {
-      return;
-    }
-    // check if all the child ops are reduce output operators
-    for (Op op : this.rootOps) {
-      if (op.type != OpType.RS) {
-        return;
-      }
-    }
-    this.hasMultiReduceOp = true;
-  }
-
-  public void setType(String type) {
-    switch (type) {
-    case "BROADCAST_EDGE":
-      this.edgeType = EdgeType.BROADCAST;
-      break;
-    case "SIMPLE_EDGE":
-      this.edgeType = EdgeType.SHUFFLE;
-      break;
-    case "CUSTOM_SIMPLE_EDGE":
-      this.edgeType = EdgeType.PARTITION_ONLY_SHUFFLE;
-      break;
-    case "CUSTOM_EDGE":
-      this.edgeType = EdgeType.MULTICAST;
-      break;
-    default:
-      this.edgeType = EdgeType.UNKNOWN;
-    }
-  }
-
-  //The following code should be gone after HIVE-11075 using topological order
-  @Override
-  public int compareTo(Vertex o) {
-    return this.name.compareTo(o.name);
-  }
-
-  public Op getSingleRSOp() {
-    if (rootOps.size() == 0) {
-      return null;
-    } else {
-      Op ret = null;
-      for (Op op : rootOps) {
-        if (op.type == OpType.RS) {
-          if (ret == null) {
-            ret = op;
-          } else {
-            // find more than one RS Op
-            return null;
-          }
-        }
-      }
-      return ret;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
index 6db5c18..1e026a7 100644
--- a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
+++ b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.common.log;
 
 import com.google.common.base.Function;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
index ee02ccb..e7661b4 100644
--- a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.common.log;
 
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
index e8abf6c..2d6c1b4 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
@@ -44,6 +44,8 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -190,22 +192,8 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
     registerAll("threads", new ThreadStatesGaugeSet());
     registerAll("classLoading", new ClassLoadingGaugeSet());
 
-    //Metrics reporter
-    Set<MetricsReporting> finalReporterList = new HashSet<MetricsReporting>();
-    List<String> metricsReporterNames = Lists.newArrayList(
-      Splitter.on(",").trimResults().omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER)));
-
-    if(metricsReporterNames != null) {
-      for (String metricsReportingName : metricsReporterNames) {
-        try {
-          MetricsReporting reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase());
-          finalReporterList.add(reporter);
-        } catch (IllegalArgumentException e) {
-          LOGGER.warn("Metrics reporter skipped due to invalid configured reporter: " + metricsReportingName);
-        }
-      }
-    }
-    initReporting(finalReporterList);
+    //initialize reporters
+    initReporting();
   }
 
 
@@ -385,107 +373,99 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
   }
 
   /**
-   * Should be only called once to initialize the reporters
+   * Initializes reporters from HIVE_CODAHALE_METRICS_REPORTER_CLASSES or HIVE_METRICS_REPORTER if the former is not defined.
+   * Note: if both confs are defined, only  HIVE_CODAHALE_METRICS_REPORTER_CLASSES will be used.
    */
-  private void initReporting(Set<MetricsReporting> reportingSet) {
-    for (MetricsReporting reporting : reportingSet) {
-      switch(reporting) {
-        case CONSOLE:
-          final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
-            .convertRatesTo(TimeUnit.SECONDS)
-            .convertDurationsTo(TimeUnit.MILLISECONDS)
-            .build();
-          consoleReporter.start(1, TimeUnit.SECONDS);
-          reporters.add(consoleReporter);
-          break;
-        case JMX:
-          final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry)
-            .convertRatesTo(TimeUnit.SECONDS)
-            .convertDurationsTo(TimeUnit.MILLISECONDS)
-            .build();
-          jmxReporter.start();
-          reporters.add(jmxReporter);
-          break;
-        case JSON_FILE:
-          final JsonFileReporter jsonFileReporter = new JsonFileReporter();
-          jsonFileReporter.start();
-          reporters.add(jsonFileReporter);
-          break;
-        case HADOOP2:
-          String applicationName = conf.get(HiveConf.ConfVars.HIVE_METRICS_HADOOP2_COMPONENT_NAME.varname);
-          long reportingInterval = HiveConf.toTime(
-              conf.get(HiveConf.ConfVars.HIVE_METRICS_HADOOP2_INTERVAL.varname),
-              TimeUnit.SECONDS, TimeUnit.SECONDS);
-          final HadoopMetrics2Reporter metrics2Reporter = HadoopMetrics2Reporter.forRegistry(metricRegistry)
-              .convertRatesTo(TimeUnit.SECONDS)
-              .convertDurationsTo(TimeUnit.MILLISECONDS)
-              .build(DefaultMetricsSystem.initialize(applicationName), // The application-level name
-                  applicationName, // Component name
-                  applicationName, // Component description
-                  "General"); // Name for each metric record
-          metrics2Reporter.start(reportingInterval, TimeUnit.SECONDS);
-          break;
-      }
+  private void initReporting() {
+
+    if (!(initCodahaleMetricsReporterClasses() || initMetricsReporter())) {
+      LOGGER.warn("Unable to initialize metrics reporting");
+    }
+    if (reporters.isEmpty()) {
+      // log a warning incase no reporters were successfully added
+      LOGGER.warn("No reporters configured for codahale metrics!");
     }
   }
 
-  class JsonFileReporter implements Closeable {
-    private ObjectMapper jsonMapper = null;
-    private java.util.Timer timer = null;
-
-    public void start() {
-      this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS, false));
-      this.timer = new java.util.Timer(true);
+  /**
+   * Initializes reporting using HIVE_CODAHALE_METRICS_REPORTER_CLASSES.
+   * @return whether initialization was successful or not
+   */
+  private boolean initCodahaleMetricsReporterClasses() {
 
-      long time = conf.getTimeVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, TimeUnit.MILLISECONDS);
-      final String pathString = conf.getVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION);
+    List<String> reporterClasses = Lists.newArrayList(Splitter.on(",").trimResults().
+        omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_CODAHALE_METRICS_REPORTER_CLASSES)));
+    if (reporterClasses.isEmpty()) {
+      return false;
+    }
 
-      timer.schedule(new TimerTask() {
-        @Override
-        public void run() {
-          BufferedWriter bw = null;
-          try {
-            String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
-            Path tmpPath = new Path(pathString + ".tmp");
-            URI tmpPathURI = tmpPath.toUri();
-            FileSystem fs = null;
-            if (tmpPathURI.getScheme() == null && tmpPathURI.getAuthority() == null) {
-              //default local
-              fs = FileSystem.getLocal(conf);
-            } else {
-              fs = FileSystem.get(tmpPathURI, conf);
-            }
-            fs.delete(tmpPath, true);
-            bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true)));
-            bw.write(json);
-            bw.close();
-            fs.setPermission(tmpPath, FsPermission.createImmutable((short) 0644));
-
-            Path path = new Path(pathString);
-            fs.rename(tmpPath, path);
-            fs.setPermission(path, FsPermission.createImmutable((short) 0644));
-          } catch (Exception e) {
-            LOGGER.warn("Error writing JSON Metrics to file", e);
-          } finally {
-            try {
-              if (bw != null) {
-                bw.close();
-              }
-            } catch (IOException e) {
-              //Ignore.
-            }
-          }
+    for (String reporterClass : reporterClasses) {
+      Class name = null;
+      try {
+        name = conf.getClassByName(reporterClass);
+      } catch (ClassNotFoundException e) {
+        LOGGER.error("Unable to instantiate metrics reporter class " + reporterClass +
+            " from conf HIVE_CODAHALE_METRICS_REPORTER_CLASSES", e);
+        throw new IllegalArgumentException(e);
+      }
+      try {
+        Constructor constructor = name.getConstructor(MetricRegistry.class, HiveConf.class);
+        CodahaleReporter reporter = (CodahaleReporter) constructor.newInstance(metricRegistry, conf);
+        reporter.start();
+        reporters.add(reporter);
+      } catch (NoSuchMethodException | InstantiationException |
+          IllegalAccessException | InvocationTargetException e) {
+        LOGGER.error("Unable to instantiate using constructor(MetricRegistry, HiveConf) for"
+            + " reporter " + reporterClass + " from conf HIVE_CODAHALE_METRICS_REPORTER_CLASSES",
+            e);
+        throw new IllegalArgumentException(e);
+      }
+    }
+    return true;
+  }
 
+  /**
+   * Initializes reporting using HIVE_METRICS+REPORTER.
+   * @return whether initialization was successful or not
+   */
+  private boolean initMetricsReporter() {
 
-        }
-      }, 0, time);
+    List<String> metricsReporterNames = Lists.newArrayList(Splitter.on(",").trimResults().
+        omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER)));
+    if (metricsReporterNames.isEmpty()) {
+      return false;
     }
 
-    @Override
-    public void close() {
-      if (timer != null) {
-        this.timer.cancel();
+    MetricsReporting reporter = null;
+    for (String metricsReportingName : metricsReporterNames) {
+      try {
+        reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase());
+      } catch (IllegalArgumentException e) {
+        LOGGER.error("Invalid reporter name " + metricsReportingName, e);
+        throw e;
+      }
+      CodahaleReporter codahaleReporter = null;
+      switch (reporter) {
+      case CONSOLE:
+        codahaleReporter = new ConsoleMetricsReporter(metricRegistry, conf);
+        break;
+      case JMX:
+        codahaleReporter = new JmxMetricsReporter(metricRegistry, conf);
+        break;
+      case JSON_FILE:
+        codahaleReporter = new JsonFileMetricsReporter(metricRegistry, conf);
+        break;
+      case HADOOP2:
+        codahaleReporter = new Metrics2Reporter(metricRegistry, conf);
+        break;
+      default:
+        LOGGER.warn("Unhandled reporter " + reporter + " provided.");
+      }
+      if (codahaleReporter != null) {
+        codahaleReporter.start();
+        reporters.add(codahaleReporter);
       }
     }
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleReporter.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleReporter.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleReporter.java
new file mode 100644
index 0000000..9424f28
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleReporter.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+import com.codahale.metrics.Reporter;
+import java.io.Closeable;
+
+public interface CodahaleReporter extends Closeable, Reporter {
+
+  /**
+   * Start the reporter.
+   */
+  public void start();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/ConsoleMetricsReporter.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/ConsoleMetricsReporter.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/ConsoleMetricsReporter.java
new file mode 100644
index 0000000..dea1848
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/ConsoleMetricsReporter.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+
+/**
+ * A wrapper around Codahale ConsoleReporter to make it a pluggable/configurable Hive Metrics reporter.
+ */
+public class ConsoleMetricsReporter implements CodahaleReporter {
+
+  private final ConsoleReporter reporter;
+
+  public ConsoleMetricsReporter(MetricRegistry registry, HiveConf conf) {
+
+    reporter = ConsoleReporter.forRegistry(registry)
+        .convertRatesTo(TimeUnit.SECONDS)
+        .convertDurationsTo(TimeUnit.MILLISECONDS)
+        .build();
+
+  }
+
+  @Override
+  public void start() {
+    reporter.start(1, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void close() {
+    reporter.close();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JmxMetricsReporter.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JmxMetricsReporter.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JmxMetricsReporter.java
new file mode 100644
index 0000000..f12adf9
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JmxMetricsReporter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * A wrapper around Codahale JmxReporter to make it a pluggable/configurable Hive Metrics reporter.
+ */
+public class JmxMetricsReporter implements CodahaleReporter {
+
+  private final MetricRegistry registry;
+  private final HiveConf conf;
+  private final JmxReporter jmxReporter;
+
+  public JmxMetricsReporter(MetricRegistry registry, HiveConf conf) {
+    this.registry = registry;
+    this.conf = conf;
+
+    jmxReporter = JmxReporter.forRegistry(registry)
+        .convertRatesTo(TimeUnit.SECONDS)
+        .convertDurationsTo(TimeUnit.MILLISECONDS)
+        .build();
+  }
+
+  @Override
+  public void start() {
+    jmxReporter.start();
+  }
+
+  @Override
+  public void close() {
+    jmxReporter.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JsonFileMetricsReporter.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JsonFileMetricsReporter.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JsonFileMetricsReporter.java
new file mode 100644
index 0000000..c07517a
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/JsonFileMetricsReporter.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.json.MetricsModule;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A metrics reporter for CodahaleMetrics that dumps metrics periodically into a file in JSON format.
+ */
+
+public class JsonFileMetricsReporter implements CodahaleReporter {
+
+  private final MetricRegistry metricRegistry;
+  private final ObjectWriter jsonWriter;
+  private final ScheduledExecutorService executorService;
+  private final HiveConf conf;
+  private final long interval;
+  private final String pathString;
+  private final Path path;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(JsonFileMetricsReporter.class);
+
+  public JsonFileMetricsReporter(MetricRegistry registry, HiveConf conf) {
+    this.metricRegistry = registry;
+    this.jsonWriter =
+        new ObjectMapper().registerModule(new MetricsModule(TimeUnit.MILLISECONDS,
+            TimeUnit.MILLISECONDS, false)).writerWithDefaultPrettyPrinter();
+    executorService = Executors.newSingleThreadScheduledExecutor();
+    this.conf = conf;
+
+    interval = conf.getTimeVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, TimeUnit.MILLISECONDS);
+    pathString = conf.getVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION);
+    path = new Path(pathString);
+  }
+
+  @Override
+  public void start() {
+
+    final Path tmpPath = new Path(pathString + ".tmp");
+    URI tmpPathURI = tmpPath.toUri();
+    final FileSystem fs;
+    try {
+      if (tmpPathURI.getScheme() == null && tmpPathURI.getAuthority() == null) {
+        //default local
+        fs = FileSystem.getLocal(conf);
+      } else {
+        fs = FileSystem.get(tmpPathURI, conf);
+      }
+    }
+    catch (IOException e) {
+        LOGGER.error("Unable to access filesystem for path " + tmpPath + ". Aborting reporting", e);
+        return;
+    }
+
+    Runnable task = new Runnable() {
+      public void run() {
+        try {
+          String json = null;
+          try {
+            json = jsonWriter.writeValueAsString(metricRegistry);
+          } catch (JsonProcessingException e) {
+            LOGGER.error("Unable to convert json to string ", e);
+            return;
+          }
+
+          BufferedWriter bw = null;
+          try {
+            fs.delete(tmpPath, true);
+            bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true)));
+            bw.write(json);
+            fs.setPermission(tmpPath, FsPermission.createImmutable((short) 0644));
+          } catch (IOException e) {
+            LOGGER.error("Unable to write to temp file " + tmpPath, e);
+            return;
+          } finally {
+            if (bw != null) {
+              bw.close();
+            }
+          }
+
+          try {
+            fs.rename(tmpPath, path);
+            fs.setPermission(path, FsPermission.createImmutable((short) 0644));
+          } catch (IOException e) {
+            LOGGER.error("Unable to rename temp file " + tmpPath + " to " + pathString, e);
+            return;
+          }
+        } catch (Throwable t) {
+          // catch all errors (throwable and execptions to prevent subsequent tasks from being suppressed)
+          LOGGER.error("Error executing scheduled task ", t);
+        }
+      }
+    };
+
+    executorService.scheduleWithFixedDelay(task,0, interval, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/Metrics2Reporter.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/Metrics2Reporter.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/Metrics2Reporter.java
new file mode 100644
index 0000000..3b402d8
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/Metrics2Reporter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+import com.codahale.metrics.MetricRegistry;
+import com.github.joshelser.dropwizard.metrics.hadoop.HadoopMetrics2Reporter;
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import com.codahale.metrics.Reporter;
+
+/**
+ * A wrapper around Codahale HadoopMetrics2Reporter to make it a pluggable/configurable Hive Metrics reporter.
+ */
+public class Metrics2Reporter implements CodahaleReporter {
+
+  private final MetricRegistry metricRegistry;
+  private final HiveConf conf;
+  private final HadoopMetrics2Reporter reporter;
+
+  public Metrics2Reporter(MetricRegistry registry, HiveConf conf) {
+    this.metricRegistry = registry;
+    this.conf = conf;
+    String applicationName = conf.get(HiveConf.ConfVars.HIVE_METRICS_HADOOP2_COMPONENT_NAME.varname);
+
+    reporter = HadoopMetrics2Reporter.forRegistry(metricRegistry)
+        .convertRatesTo(TimeUnit.SECONDS)
+        .convertDurationsTo(TimeUnit.MILLISECONDS)
+        .build(DefaultMetricsSystem.initialize(applicationName), // The application-level name
+            applicationName, // Component name
+            applicationName, // Component description
+            "General"); // Name for each metric record
+  }
+
+  @Override
+  public void start() {
+    long reportingInterval =
+        HiveConf.toTime(conf.get(HiveConf.ConfVars.HIVE_METRICS_HADOOP2_INTERVAL.varname), TimeUnit.SECONDS, TimeUnit.SECONDS);
+    reporter.start(reportingInterval, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void close() {
+    reporter.close();
+  }
+}