You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:42 UTC

[20/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
new file mode 100644
index 0000000..64c6fc6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEventType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public enum SessionEventType {
+  EXPIRE,
+  PING
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
new file mode 100644
index 0000000..483920f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.conf.TajoConf;
+
+public class SessionLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+
+  private EventHandler dispatcher;
+
+  public SessionLivelinessMonitor(Dispatcher d) {
+    super(SessionLivelinessMonitor.class.getSimpleName(), new SystemClock());
+    this.dispatcher = d.getEventHandler();
+  }
+
+  public void serviceInit(Configuration conf) throws Exception {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    TajoConf systemConf = (TajoConf) conf;
+
+    // seconds
+    int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.CLIENT_SESSION_EXPIRY_TIME);
+    setExpireInterval(expireIntvl);
+    setMonitorInterval(expireIntvl / 3);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void expire(String id) {
+    dispatcher.handle(new SessionEvent(id, SessionEventType.EXPIRE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
new file mode 100644
index 0000000..24df9d8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SessionManager extends CompositeService implements EventHandler<SessionEvent> {
+  private static final Log LOG = LogFactory.getLog(SessionManager.class);
+
+  public final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
+  private final Dispatcher dispatcher;
+  private SessionLivelinessMonitor sessionLivelinessMonitor;
+
+
+  public SessionManager(Dispatcher dispatcher) {
+    super(SessionManager.class.getSimpleName());
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    sessionLivelinessMonitor = new SessionLivelinessMonitor(dispatcher);
+    addIfService(sessionLivelinessMonitor);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  private void assertSessionExistence(String sessionId) throws InvalidSessionException {
+    if (!sessions.containsKey(sessionId)) {
+      throw new InvalidSessionException(sessionId);
+    }
+  }
+
+  public String createSession(String username, String baseDatabaseName) throws InvalidSessionException {
+    String sessionId;
+    Session oldSession;
+
+    sessionId = UUID.randomUUID().toString();
+    Session newSession = new Session(sessionId, username, baseDatabaseName);
+    oldSession = sessions.putIfAbsent(sessionId, newSession);
+    if (oldSession != null) {
+      throw new InvalidSessionException("Session id is duplicated: " + oldSession.getSessionId());
+    }
+    LOG.info("Session " + sessionId + " is created." );
+    return sessionId;
+  }
+
+  public void removeSession(String sessionId) {
+    if (sessions.containsKey(sessionId)) {
+      sessions.remove(sessionId);
+      LOG.info("Session " + sessionId + " is removed.");
+    } else {
+      LOG.error("No such session id: " + sessionId);
+    }
+  }
+
+  public Session getSession(String sessionId) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    return sessions.get(sessionId);
+  }
+
+  public void setVariable(String sessionId, String name, String value) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    sessions.get(sessionId).setVariable(name, value);
+  }
+
+  public String getVariable(String sessionId, String name)
+      throws InvalidSessionException, NoSuchSessionVariableException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    return sessions.get(sessionId).getVariable(name);
+  }
+
+  public void removeVariable(String sessionId, String name) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    sessions.get(sessionId).removeVariable(name);
+  }
+
+  public Map<String, String> getAllVariables(String sessionId) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    return sessions.get(sessionId).getAllVariables();
+  }
+
+  public void touch(String sessionId) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    sessions.get(sessionId).updateLastAccessTime();
+    sessionLivelinessMonitor.receivedPing(sessionId);
+  }
+
+  @Override
+  public void handle(SessionEvent event) {
+    LOG.info("Processing " + event.getSessionId() + " of type " + event.getType());
+
+    try {
+      assertSessionExistence(event.getSessionId());
+      touch(event.getSessionId());
+    } catch (InvalidSessionException e) {
+      LOG.error(e);
+    }
+
+    if (event.getType() == SessionEventType.EXPIRE) {
+      Session session = sessions.remove(event.getSessionId());
+      LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/net/CachedDNSResolver.java b/tajo-core/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
new file mode 100644
index 0000000..2a53c47
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/net/CachedDNSResolver.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.net;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CachedDNSResolver {
+  private static Map<String, String> hostNameToIPAddrMap
+      = new ConcurrentHashMap<String, String>();
+
+  private static CachedDNSResolver instance;
+
+  static {
+    instance = new CachedDNSResolver();
+  }
+
+  public static String resolve(String hostName) {
+
+    if (hostNameToIPAddrMap.containsKey(hostName)) {
+      return hostNameToIPAddrMap.get(hostName);
+    }
+
+    String ipAddress = null;
+    try {
+      ipAddress = InetAddress.getByName(hostName).getHostAddress();
+    } catch (UnknownHostException e) {
+      e.printStackTrace();
+    }
+    hostNameToIPAddrMap.put(hostName, ipAddress);
+
+    return ipAddress;
+  }
+
+  public static String [] resolve(String [] hostNames) {
+    if (hostNames == null) {
+      return null;
+    }
+
+    String [] resolved = new String[hostNames.length];
+    for (int i = 0; i < hostNames.length; i++) {
+      resolved[i] = resolve(hostNames[i]);
+    }
+    return resolved;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java b/tajo-core/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
new file mode 100644
index 0000000..1db0c3b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+
+public class ApplicationIdUtils {
+  public static ApplicationAttemptId createApplicationAttemptId(QueryId queryId, int attemptId) {
+    return BuilderUtils.newApplicationAttemptId(queryIdToAppId(queryId), attemptId);
+  }
+
+  public static ApplicationAttemptId createApplicationAttemptId(QueryId queryId) {
+    return BuilderUtils.newApplicationAttemptId(queryIdToAppId(queryId), 1);
+  }
+
+  public static ApplicationId queryIdToAppId(QueryId queryId) {
+    return BuilderUtils.newApplicationId(Long.parseLong(queryId.getId()), queryId.getSeq());
+  }
+
+  public static QueryId appIdToQueryId(YarnProtos.ApplicationIdProto appId) {
+    return QueryIdFactory.newQueryId(appId.getClusterTimestamp(), appId.getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/ClassUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/ClassUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/ClassUtil.java
new file mode 100644
index 0000000..160b585
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/ClassUtil.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+public abstract class ClassUtil {
+  private static final Log LOG = LogFactory.getLog(ClassUtil.class);
+
+  public static Set<Class> findClasses(Class type, String packageFilter) {
+    Set<Class> classSet = new HashSet<Class>();
+
+    String classpath = System.getProperty("java.class.path");
+    String[] paths = classpath.split(System.getProperty("path.separator"));
+
+    for (String path : paths) {
+      File file = new File(path);
+      if (file.exists()) {
+        findClasses(classSet, file, file, true, type, packageFilter);
+      }
+    }
+
+    return classSet;
+  }
+
+  private static void findClasses(Set<Class> matchedClassSet, File root, File file, boolean includeJars, Class type, String packageFilter) {
+    if (file.isDirectory()) {
+      for (File child : file.listFiles()) {
+        findClasses(matchedClassSet, root, child, includeJars, type, packageFilter);
+      }
+    } else {
+      if (file.getName().toLowerCase().endsWith(".jar") && includeJars) {
+        JarFile jar = null;
+        try {
+          jar = new JarFile(file);
+        } catch (Exception ex) {
+          LOG.error(ex.getMessage(), ex);
+          return;
+        }
+        Enumeration<JarEntry> entries = jar.entries();
+        while (entries.hasMoreElements()) {
+          JarEntry entry = entries.nextElement();
+          String name = entry.getName();
+          int extIndex = name.lastIndexOf(".class");
+          if (extIndex > 0) {
+            String qualifiedClassName = name.substring(0, extIndex).replace("/", ".");
+            if (qualifiedClassName.indexOf(packageFilter) >= 0 && !isTestClass(qualifiedClassName)) {
+              try {
+                Class clazz = Class.forName(qualifiedClassName);
+
+                if (!clazz.isInterface() && isMatch(type, clazz)) {
+                  matchedClassSet.add(clazz);
+                }
+              } catch (ClassNotFoundException e) {
+                LOG.error(e.getMessage(), e);
+              }
+            }
+          }
+        }
+      } else if (file.getName().toLowerCase().endsWith(".class")) {
+        String qualifiedClassName = createClassName(root, file);
+        if (qualifiedClassName.indexOf(packageFilter) >= 0 && !isTestClass(qualifiedClassName)) {
+          try {
+            Class clazz = Class.forName(qualifiedClassName);
+            if (!clazz.isInterface() && isMatch(type, clazz)) {
+              matchedClassSet.add(clazz);
+            }
+          } catch (ClassNotFoundException e) {
+            LOG.error(e.getMessage(), e);
+          }
+        }
+      }
+    }
+  }
+
+  private static boolean isTestClass(String qualifiedClassName) {
+    String className = getClassName(qualifiedClassName);
+    if(className == null) {
+      return false;
+    }
+
+    return className.startsWith("Test");
+  }
+
+  private static boolean isMatch(Class targetClass, Class loadedClass) {
+    if (targetClass.equals(loadedClass)) {
+      return true;
+    }
+
+    Class[] classInterfaces = loadedClass.getInterfaces();
+    if (classInterfaces != null) {
+      for (Class eachInterfaceClass : classInterfaces) {
+        if (eachInterfaceClass.equals(targetClass)) {
+          return true;
+        }
+
+        if (isMatch(targetClass, eachInterfaceClass)) {
+          return true;
+        }
+      }
+    }
+
+    Class superClass = loadedClass.getSuperclass();
+    if (superClass != null) {
+      if (isMatch(targetClass, superClass)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static String getClassName(String qualifiedClassName) {
+    String[] tokens = qualifiedClassName.split("\\.");
+    if (tokens.length == 0) {
+      return qualifiedClassName;
+    }
+    return tokens[tokens.length - 1];
+  }
+
+  private static String createClassName(File root, File file) {
+    StringBuffer sb = new StringBuffer();
+    String fileName = file.getName();
+    sb.append(fileName.substring(0, fileName.lastIndexOf(".class")));
+    file = file.getParentFile();
+    while (file != null && !file.equals(root)) {
+      sb.insert(0, '.').insert(0, file.getName());
+      file = file.getParentFile();
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/GeoIPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/GeoIPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/GeoIPUtil.java
new file mode 100644
index 0000000..859b37d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/GeoIPUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.maxmind.geoip.LookupService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+
+import java.io.IOException;
+
+public class GeoIPUtil {
+  private static final Log LOG = LogFactory.getLog(GeoIPUtil.class);
+  private static LookupService lookup;
+
+  static {
+    try {
+      TajoConf conf = new TajoConf();
+      lookup = new LookupService(conf.getVar(ConfVars.GEOIP_DATA),
+          LookupService.GEOIP_MEMORY_CACHE);
+    } catch (IOException e) {
+      LOG.error("Cannot open the geoip data", e);
+    }
+  }
+
+  public static String getCountryCode(String host) {
+    return lookup.getCountry(host).getCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
new file mode 100644
index 0000000..8816f8f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import com.google.gson.Gson;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.logical.IndexScanNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+
+public class IndexUtil {
+  public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) {
+    StringBuilder builder = new StringBuilder(); 
+    builder.append(fragment.getPath().getName() + "_");
+    builder.append(fragment.getStartKey() + "_" + fragment.getEndKey() + "_");
+    for(int i = 0 ; i < keys.length ; i ++) {
+      builder.append(keys[i].getSortKey().getSimpleName()+"_");
+    }
+    builder.append("_index");
+    return builder.toString();
+       
+  }
+  
+  public static String getIndexName(String indexName , SortSpec[] keys) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(indexName + "_");
+    for(int i = 0 ; i < keys.length ; i ++) {
+      builder.append(keys[i].getSortKey().getSimpleName() + "_");
+    }
+    return builder.toString();
+  }
+  
+  public static IndexScanNode indexEval(LogicalPlan plan, ScanNode scanNode,
+      Iterator<Entry<String, String>> iter ) {
+   
+    EvalNode qual = scanNode.getQual();
+    Gson gson = CoreGsonHelper.getInstance();
+    
+    FieldAndValueFinder nodeFinder = new FieldAndValueFinder();
+    qual.preOrder(nodeFinder);
+    LinkedList<EvalNode> nodeList = nodeFinder.getNodeList();
+    
+    int maxSize = Integer.MIN_VALUE;
+    SortSpec[] maxIndex = null;
+    
+    String json;
+    while(iter.hasNext()) {
+      Entry<String , String> entry = iter.next();
+      json = entry.getValue();
+      SortSpec[] sortKey = gson.fromJson(json, SortSpec[].class);
+      if(sortKey.length > nodeList.size()) {
+        /* If the number of the sort key is greater than where condition, 
+         * this index cannot be used
+         * */
+        continue; 
+      } else {
+        boolean[] equal = new boolean[sortKey.length];
+        for(int i = 0 ; i < sortKey.length ; i ++) {
+          for(int j = 0 ; j < nodeList.size() ; j ++) {
+            Column col = ((FieldEval)(nodeList.get(j).getLeftExpr())).getColumnRef();
+            if(col.equals(sortKey[i].getSortKey())) {
+              equal[i] = true;
+            }
+          }
+        }
+        boolean chk = true;
+        for(int i = 0 ; i < equal.length ; i ++) {
+          chk = chk && equal[i];
+        }
+        if(chk) {
+          if(maxSize < sortKey.length) {
+            maxSize = sortKey.length;
+            maxIndex = sortKey;
+          }
+        }
+      }
+    }
+    if(maxIndex == null) {
+      return null;
+    } else {
+      Schema keySchema = new Schema();
+      for(int i = 0 ; i < maxIndex.length ; i ++ ) {
+        keySchema.addColumn(maxIndex[i].getSortKey());
+      }
+      Datum[] datum = new Datum[nodeList.size()];
+      for(int i = 0 ; i < nodeList.size() ; i ++ ) {
+        datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue();
+      }
+      
+      return new IndexScanNode(plan.newPID(), scanNode, keySchema , datum , maxIndex);
+    }
+
+  }
+  
+  
+  private static class FieldAndValueFinder implements EvalNodeVisitor {
+    private LinkedList<EvalNode> nodeList = new LinkedList<EvalNode>();
+    
+    public LinkedList<EvalNode> getNodeList () {
+      return this.nodeList;
+    }
+    
+    @Override
+    public void visit(EvalNode node) {
+      switch(node.getType()) {
+      case AND:
+        break;
+      case EQUAL:
+        if( node.getLeftExpr().getType() == EvalType.FIELD
+          && node.getRightExpr().getType() == EvalType.CONST ) {
+          nodeList.add(node);
+        }
+        break;
+      case IS_NULL:
+        if( node.getLeftExpr().getType() == EvalType.FIELD
+          && node.getRightExpr().getType() == EvalType.CONST) {
+          nodeList.add(node);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
new file mode 100644
index 0000000..58a3550
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.worker.TaskRunner;
+
+import java.text.DecimalFormat;
+import java.util.*;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
+public class JSPUtil {
+  static DecimalFormat decimalF = new DecimalFormat("###.0");
+
+  public static void sortQueryUnit(QueryUnit[] queryUnits, String sortField, String sortOrder) {
+    if(sortField == null || sortField.isEmpty()) {
+      sortField = "id";
+    }
+
+    Arrays.sort(queryUnits, new QueryUnitComparator(sortField, "asc".equals(sortOrder)));
+  }
+
+  public static void sortTaskRunner(List<TaskRunner> taskRunners) {
+    Collections.sort(taskRunners, new Comparator<TaskRunner>() {
+      @Override
+      public int compare(TaskRunner taskRunner, TaskRunner taskRunner2) {
+        return taskRunner.getId().compareTo(taskRunner2.getId());
+      }
+    });
+  }
+
+  public static String getElapsedTime(long startTime, long finishTime) {
+    if(startTime == 0) {
+      return "-";
+    }
+    return finishTime == 0 ? decimalF.format((System.currentTimeMillis() - startTime) / 1000) + " sec"
+        : decimalF.format((finishTime - startTime) / 1000) + " sec";
+  }
+
+  public static String getTajoMasterHttpAddr(Configuration config) {
+    try {
+      TajoConf conf = (TajoConf) config;
+      String [] masterAddr = conf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS).split(":");
+      return masterAddr[0] + ":" + conf.getVar(ConfVars.TAJO_MASTER_INFO_ADDRESS).split(":")[1];
+    } catch (Exception e) {
+      e.printStackTrace();
+      return e.getMessage();
+    }
+  }
+
+  public static List<QueryMasterTask> sortQueryMasterTask(Collection<QueryMasterTask> queryMasterTasks,
+                                                          final boolean desc) {
+    List<QueryMasterTask> queryMasterTaskList = new ArrayList<QueryMasterTask>(queryMasterTasks);
+
+    Collections.sort(queryMasterTaskList, new Comparator<QueryMasterTask>() {
+
+      @Override
+      public int compare(QueryMasterTask task1, QueryMasterTask task2) {
+        if(desc) {
+          return task2.getQueryId().toString().compareTo(task1.getQueryId().toString());
+        } else {
+          return task1.getQueryId().toString().compareTo(task2.getQueryId().toString());
+        }
+      }
+    });
+
+    return queryMasterTaskList;
+  }
+
+  public static List<QueryInProgress> sortQueryInProgress(Collection<QueryInProgress> queryInProgresses,
+                                                          final boolean desc) {
+    List<QueryInProgress> queryProgressList = new ArrayList<QueryInProgress>(queryInProgresses);
+
+    Collections.sort(queryProgressList, new Comparator<QueryInProgress>() {
+      @Override
+      public int compare(QueryInProgress query1, QueryInProgress query2) {
+        if(desc) {
+          return query2.getQueryId().toString().compareTo(query1.getQueryId().toString());
+        } else {
+          return query1.getQueryId().toString().compareTo(query2.getQueryId().toString());
+        }
+      }
+    });
+
+    return queryProgressList;
+  }
+
+  public static List<SubQuery> sortSubQuery(Collection<SubQuery> subQueries) {
+    List<SubQuery> subQueryList = new ArrayList<SubQuery>(subQueries);
+    Collections.sort(subQueryList, new Comparator<SubQuery>() {
+      @Override
+      public int compare(SubQuery subQuery1, SubQuery subQuery2) {
+        long q1StartTime = subQuery1.getStartTime();
+        long q2StartTime = subQuery2.getStartTime();
+
+        q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime);
+        q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime);
+
+        int result = compareLong(q1StartTime, q2StartTime);
+        if (result == 0) {
+          return subQuery1.getId().toString().compareTo(subQuery2.getId().toString());
+        } else {
+          return result;
+        }
+      }
+    });
+
+    return subQueryList;
+  }
+
+  static class QueryUnitComparator implements Comparator<QueryUnit> {
+    private String sortField;
+    private boolean asc;
+    public QueryUnitComparator(String sortField, boolean asc) {
+      this.sortField = sortField;
+      this.asc = asc;
+    }
+
+    @Override
+    public int compare(QueryUnit queryUnit, QueryUnit queryUnit2) {
+      if(asc) {
+        if("id".equals(sortField)) {
+          return queryUnit.getId().compareTo(queryUnit2.getId());
+        } else if("host".equals(sortField)) {
+          String host1 = queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost();
+          String host2 = queryUnit2.getSucceededHost() == null ? "-" : queryUnit2.getSucceededHost();
+          return host1.compareTo(host2);
+        } else if("runTime".equals(sortField)) {
+          return compareLong(queryUnit.getRunningTime(), queryUnit2.getRunningTime());
+        } else if("startTime".equals(sortField)) {
+          return compareLong(queryUnit.getLaunchTime(), queryUnit2.getLaunchTime());
+        } else {
+          return queryUnit.getId().compareTo(queryUnit2.getId());
+        }
+      } else {
+        if("id".equals(sortField)) {
+          return queryUnit2.getId().compareTo(queryUnit.getId());
+        } else if("host".equals(sortField)) {
+          String host1 = queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost();
+          String host2 = queryUnit2.getSucceededHost() == null ? "-" : queryUnit2.getSucceededHost();
+          return host2.compareTo(host1);
+        } else if("runTime".equals(sortField)) {
+          if(queryUnit2.getLaunchTime() == 0) {
+            return -1;
+          } else if(queryUnit.getLaunchTime() == 0) {
+            return 1;
+          }
+          return compareLong(queryUnit2.getRunningTime(), queryUnit.getRunningTime());
+        } else if("startTime".equals(sortField)) {
+          return compareLong(queryUnit2.getLaunchTime(), queryUnit.getLaunchTime());
+        } else {
+          return queryUnit2.getId().compareTo(queryUnit.getId());
+        }
+      }
+    }
+  }
+
+  static int compareLong(long a, long b) {
+    if(a > b) {
+      return 1;
+    } else if(a < b) {
+      return -1;
+    } else {
+      return 0;
+    }
+  }
+
+  public static void sortFunctionDesc(List<FunctionDesc> functions) {
+    Collections.sort(functions, new java.util.Comparator<FunctionDesc>() {
+      @Override
+      public int compare(FunctionDesc f1, FunctionDesc f2) {
+        int nameCompared = f1.getSignature().compareTo(f2.getSignature());
+        if(nameCompared != 0) {
+          return nameCompared;
+        } else {
+          return f1.getReturnType().getType().compareTo(f2.getReturnType().getType());
+        }
+      }
+    });
+  }
+
+  static final DecimalFormat PERCENT_FORMAT = new DecimalFormat("###.0");
+  public static String percentFormat(float value) {
+    return PERCENT_FORMAT.format(value * 100.0f);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
new file mode 100644
index 0000000..a273475
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+public class GroupNameMetricsFilter implements MetricFilter {
+  String groupName;
+
+  public GroupNameMetricsFilter(String groupName) {
+    this.groupName = groupName;
+  }
+  @Override
+  public boolean matches(String name, Metric metric) {
+    if(name != null) {
+      String[] tokens = name.split("\\.");
+      if(groupName.equals(tokens[0])) {
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
new file mode 100644
index 0000000..6e130ff
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LogEventGaugeSet implements MetricSet {
+
+  @Override
+  public Map<String, Metric> getMetrics() {
+    final Map<String, Metric> gauges = new HashMap<String, Metric>();
+
+    gauges.put("Fatal", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return TajoLogEventCounter.getFatal();
+      }
+    });
+
+    gauges.put("Error", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return TajoLogEventCounter.getError();
+      }
+    });
+
+    gauges.put("Warn", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return TajoLogEventCounter.getWarn();
+      }
+    });
+
+    gauges.put("Info", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return TajoLogEventCounter.getInfo();
+      }
+    });
+
+    return gauges;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
new file mode 100644
index 0000000..b2fc6e4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MetricsFilterList implements MetricFilter {
+  List<MetricFilter> filters = new ArrayList<MetricFilter>();
+
+  public void addMetricFilter(MetricFilter filter) {
+    filters.add(filter);
+  }
+
+  @Override
+  public boolean matches(String name, Metric metric) {
+    for (MetricFilter eachFilter: filters) {
+      if (!eachFilter.matches(name, metric)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
new file mode 100644
index 0000000..4faa3e7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class RegexpMetricsFilter implements MetricFilter {
+  List<Pattern> filterPatterns = new ArrayList<Pattern>();
+
+  public RegexpMetricsFilter(Collection<String> filterExpressions) {
+    for(String eachExpression: filterExpressions) {
+      filterPatterns.add(Pattern.compile(eachExpression));
+    }
+  }
+
+  @Override
+  public boolean matches(String name, Metric metric) {
+    if(filterPatterns.isEmpty()) {
+      return true;
+    }
+
+    for(Pattern eachPattern: filterPatterns) {
+      if(eachPattern.matcher(name).find()) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
new file mode 100644
index 0000000..3e44b02
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class TajoLogEventCounter extends AppenderSkeleton {
+  private static final int FATAL = 0;
+  private static final int ERROR = 1;
+  private static final int WARN = 2;
+  private static final int INFO = 3;
+
+  private static class EventCounts {
+
+    private final long[] counts = {0, 0, 0, 0};
+
+    private synchronized void incr(int i) {
+      ++counts[i];
+    }
+
+    private synchronized long get(int i) {
+      return counts[i];
+    }
+  }
+
+  private static EventCounts counts = new EventCounts();
+
+  public static long getFatal() {
+    return counts.get(FATAL);
+  }
+
+  public static long getError() {
+    return counts.get(ERROR);
+  }
+
+  public static long getWarn() {
+    return counts.get(WARN);
+  }
+
+  public static long getInfo() {
+    return counts.get(INFO);
+  }
+
+  @Override
+  public void append(LoggingEvent event) {
+    Level level = event.getLevel();
+    String levelStr = level.toString();
+
+    if (level == Level.INFO || "INFO".equalsIgnoreCase(levelStr)) {
+      counts.incr(INFO);
+    } else if (level == Level.WARN || "WARN".equalsIgnoreCase(levelStr)) {
+      counts.incr(WARN);
+    } else if (level == Level.ERROR || "ERROR".equalsIgnoreCase(levelStr)) {
+      counts.incr(ERROR);
+    } else if (level == Level.FATAL || "FATAL".equalsIgnoreCase(levelStr)) {
+      counts.incr(FATAL);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public boolean requiresLayout() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
new file mode 100644
index 0000000..0e378b2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsReporter;
+
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TajoMetrics {
+  private static final Log LOG = LogFactory.getLog(TajoMetrics.class);
+
+  protected MetricRegistry metricRegistry;
+  protected AtomicBoolean stop = new AtomicBoolean(false);
+  protected String metricsGroupName;
+
+  public TajoMetrics(String metricsGroupName) {
+    this.metricsGroupName = metricsGroupName;
+    this.metricRegistry = new MetricRegistry();
+  }
+
+  public void stop() {
+    stop.set(true);
+  }
+
+  public MetricRegistry getRegistry() {
+    return metricRegistry;
+  }
+
+  public void report(TajoMetricsReporter reporter) {
+    try {
+      reporter.report(metricRegistry.getGauges(),
+          metricRegistry.getCounters(),
+          metricRegistry.getHistograms(),
+          metricRegistry.getMeters(),
+          metricRegistry.getTimers());
+    } catch (Exception e) {
+      if(LOG.isDebugEnabled()) {
+        LOG.warn("Metric report error:" + e.getMessage(), e);
+      } else {
+        LOG.warn("Metric report error:" + e.getMessage());
+      }
+    }
+  }
+
+  public Map<String, Metric> getMetrics() {
+    return metricRegistry.getMetrics();
+  }
+
+  public SortedMap<String, Gauge> getGuageMetrics(MetricFilter filter) {
+    if(filter == null) {
+      filter = MetricFilter.ALL;
+    }
+    return metricRegistry.getGauges(filter);
+  }
+
+  public SortedMap<String, Counter> getCounterMetrics(MetricFilter filter) {
+    if(filter == null) {
+      filter = MetricFilter.ALL;
+    }
+    return metricRegistry.getCounters(filter);
+  }
+
+  public SortedMap<String, Histogram> getHistogramMetrics(MetricFilter filter) {
+    if(filter == null) {
+      filter = MetricFilter.ALL;
+    }
+    return metricRegistry.getHistograms(filter);
+  }
+
+  public SortedMap<String, Meter> getMeterMetrics(MetricFilter filter) {
+    if(filter == null) {
+      filter = MetricFilter.ALL;
+    }
+    return metricRegistry.getMeters(filter);
+  }
+
+  public SortedMap<String, Timer> getTimerMetrics(MetricFilter filter) {
+    if(filter == null) {
+      filter = MetricFilter.ALL;
+    }
+    return metricRegistry.getTimers(filter);
+  }
+
+  public void register(String contextName, MetricSet metricSet) {
+    metricRegistry.register(MetricRegistry.name(metricsGroupName, contextName), metricSet);
+  }
+
+  public void register(String contextName, String itemName, Gauge gauge) {
+    metricRegistry.register(makeMetricsName(metricsGroupName, contextName, itemName), gauge);
+  }
+
+  public Counter counter(String contextName, String itemName) {
+    return metricRegistry.counter(makeMetricsName(metricsGroupName, contextName, itemName));
+  }
+
+  public Histogram histogram(String contextName, String itemName) {
+    return metricRegistry.histogram(makeMetricsName(metricsGroupName, contextName, itemName));
+  }
+
+  public Meter meter(String contextName, String itemName) {
+    return metricRegistry.meter(makeMetricsName(metricsGroupName, contextName, itemName));
+  }
+
+  public Timer timer(String contextName, String itemName) {
+    return metricRegistry.timer(makeMetricsName(metricsGroupName, contextName, itemName));
+  }
+
+  public static String makeMetricsName(String metricsGroupName, String contextName, String itemName) {
+    return MetricRegistry.name(metricsGroupName, contextName, itemName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
new file mode 100644
index 0000000..4192ca0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.event.ConfigurationEvent;
+import org.apache.commons.configuration.event.ConfigurationListener;
+import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsScheduledReporter;
+
+import java.util.*;
+
+public class TajoSystemMetrics extends TajoMetrics {
+  private static final Log LOG = LogFactory.getLog(TajoSystemMetrics.class);
+
+  private PropertiesConfiguration metricsProps;
+
+  private Thread propertyChangeChecker;
+
+  private String hostAndPort;
+
+  private List<TajoMetricsScheduledReporter> metricsReporters = new ArrayList<TajoMetricsScheduledReporter>();
+
+  private boolean inited = false;
+
+  private String metricsPropertyFileName;
+
+  public TajoSystemMetrics(TajoConf tajoConf, String metricsGroupName, String hostAndPort) {
+    super(metricsGroupName);
+
+    this.hostAndPort = hostAndPort;
+    try {
+      this.metricsPropertyFileName = tajoConf.getVar(TajoConf.ConfVars.METRICS_PROPERTY_FILENAME);
+      this.metricsProps = new PropertiesConfiguration(metricsPropertyFileName);
+      this.metricsProps.addConfigurationListener(new MetricsReloadListener());
+      FileChangedReloadingStrategy reloadingStrategy = new FileChangedReloadingStrategy();
+      reloadingStrategy.setRefreshDelay(5 * 1000);
+      this.metricsProps.setReloadingStrategy(reloadingStrategy);
+    } catch (ConfigurationException e) {
+      LOG.warn(e.getMessage(), e);
+    }
+
+    //PropertiesConfiguration fire configurationChanged after getXXX()
+    //So neeaded calling getXXX periodically
+    propertyChangeChecker = new Thread() {
+      public void run() {
+        while(!stop.get()) {
+          String value = metricsProps.getString("reporter.file");
+          try {
+            Thread.sleep(10 * 1000);
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+
+    propertyChangeChecker.start();
+  }
+
+  public Collection<TajoMetricsScheduledReporter> getMetricsReporters() {
+    synchronized (metricsReporters) {
+      return Collections.unmodifiableCollection(metricsReporters);
+    }
+  }
+
+  @Override
+  public void stop() {
+    super.stop();
+    if(propertyChangeChecker != null) {
+      propertyChangeChecker.interrupt();
+    }
+    stopAndClearReporter();
+  }
+
+  protected void stopAndClearReporter() {
+    synchronized(metricsReporters) {
+      for(TajoMetricsScheduledReporter eachReporter: metricsReporters) {
+        eachReporter.close();
+      }
+
+      metricsReporters.clear();
+    }
+  }
+
+  public void start() {
+    setMetricsReporter(metricsGroupName);
+
+    String jvmMetricsName = metricsGroupName + "-jvm";
+    setMetricsReporter(jvmMetricsName);
+
+    if(!inited) {
+      metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Heap"), new MemoryUsageGaugeSet());
+      metricRegistry.register(MetricRegistry.name(jvmMetricsName, "File"), new FileDescriptorRatioGauge());
+      metricRegistry.register(MetricRegistry.name(jvmMetricsName, "GC"), new GarbageCollectorMetricSet());
+      metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Thread"), new ThreadStatesGaugeSet());
+      metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Log"), new LogEventGaugeSet());
+    }
+    inited = true;
+  }
+
+  private void setMetricsReporter(String groupName) {
+    //reporter name -> class name
+    Map<String, String> reporters = new HashMap<String, String>();
+
+    List<String> reporterNames = metricsProps.getList(groupName + ".reporters");
+    if(reporterNames.isEmpty()) {
+      LOG.warn("No property " + groupName + ".reporters in " + metricsPropertyFileName);
+      return;
+    }
+
+    Map<String, String> allReporterProperties = new HashMap<String, String>();
+
+    Iterator<String> keys = metricsProps.getKeys();
+    while (keys.hasNext()) {
+      String key = keys.next();
+      String value = metricsProps.getString(key);
+      if(key.indexOf("reporter.") == 0) {
+        String[] tokens = key.split("\\.");
+        if(tokens.length == 2) {
+          reporters.put(tokens[1], value);
+        }
+      } else if(key.indexOf(groupName + ".") == 0) {
+        String[] tokens = key.split("\\.");
+        if(tokens.length > 2) {
+          allReporterProperties.put(key, value);
+        }
+      }
+    }
+
+    synchronized(metricsReporters) {
+      for(String eachReporterName: reporterNames) {
+        if("null".equals(eachReporterName)) {
+          continue;
+        }
+        String reporterClass = reporters.get(eachReporterName);
+        if(reporterClass == null) {
+          LOG.warn("No metrics reporter definition[" + eachReporterName + "] in " + metricsPropertyFileName);
+          continue;
+        }
+
+        Map<String, String> eachMetricsReporterProperties = findMetircsProperties(allReporterProperties,
+            groupName + "." + eachReporterName);
+
+        try {
+          Object reporterObject = Class.forName(reporterClass).newInstance();
+          if(!(reporterObject instanceof TajoMetricsScheduledReporter)) {
+            LOG.warn(reporterClass + " is not subclass of " + TajoMetricsScheduledReporter.class.getCanonicalName());
+            continue;
+          }
+          TajoMetricsScheduledReporter reporter = (TajoMetricsScheduledReporter)reporterObject;
+          reporter.init(metricRegistry, groupName, hostAndPort, eachMetricsReporterProperties);
+          reporter.start();
+
+          metricsReporters.add(reporter);
+          LOG.info("Started metrics reporter " + reporter.getClass().getCanonicalName() + " for " + groupName);
+        } catch (ClassNotFoundException e) {
+          LOG.warn("No metrics reporter class[" + eachReporterName + "], required class= " + reporterClass);
+          continue;
+        } catch (Exception e) {
+          LOG.warn("Can't initiate metrics reporter class[" + eachReporterName + "]" + e.getMessage() , e);
+          continue;
+        }
+      }
+    }
+  }
+
+  private Map<String, String> findMetircsProperties(Map<String, String> allReporterProperties, String findKey) {
+    Map<String, String> metricsProperties = new HashMap<String, String>();
+
+    for (Map.Entry<String, String> entry: allReporterProperties.entrySet()) {
+      String eachKey = entry.getKey();
+      if (eachKey.indexOf(findKey) == 0) {
+        metricsProperties.put(eachKey, entry.getValue());
+      }
+    }
+    return metricsProperties;
+  }
+
+  class MetricsReloadListener implements ConfigurationListener {
+    @Override
+    public synchronized void configurationChanged(ConfigurationEvent event) {
+      if (!event.isBeforeUpdate()) {
+        stopAndClearReporter();
+        start();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
new file mode 100644
index 0000000..b9acf0e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import info.ganglia.gmetric4j.gmetric.GMetricSlope;
+import info.ganglia.gmetric4j.gmetric.GMetricType;
+import info.ganglia.gmetric4j.gmetric.GangliaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.SortedMap;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+public class GangliaReporter extends TajoMetricsScheduledReporter {
+  private static final Logger LOG = LoggerFactory.getLogger(GangliaReporter.class);
+  public static final String REPORTER_NAME = "ganglia";
+
+  private GMetric ganglia;
+  private String prefix;
+  private int tMax = 60;
+  private int dMax = 0;
+
+  @Override
+  protected String getReporterName() {
+    return REPORTER_NAME;
+  }
+
+  @Override
+  protected void afterInit() {
+    String server = metricsProperties.get(metricsPropertyKey + "server");
+    String port = metricsProperties.get(metricsPropertyKey + "port");
+
+    if(server == null || server.isEmpty()) {
+      LOG.warn("No " + metricsPropertyKey + "server property in tajo-metrics.properties");
+      return;
+    }
+
+    if(port == null || port.isEmpty()) {
+      LOG.warn("No " + metricsPropertyKey + "port property in tajo-metrics.properties");
+      return;
+    }
+
+    try {
+      ganglia = new GMetric(server, Integer.parseInt(port), GMetric.UDPAddressingMode.MULTICAST, 1);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  public void setPrefix(String prefix) {
+    this.prefix = prefix;
+  }
+
+  public void settMax(int tMax) {
+    this.tMax = tMax;
+  }
+
+  public void setdMax(int dMax) {
+    this.dMax = dMax;
+  }
+
+  @Override
+  public void report(SortedMap<String, Gauge> gauges,
+                     SortedMap<String, Counter> counters,
+                     SortedMap<String, Histogram> histograms,
+                     SortedMap<String, Meter> meters,
+                     SortedMap<String, Timer> timers) {
+    for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+      reportGauge(entry.getKey(), entry.getValue());
+    }
+
+    for (Map.Entry<String, Counter> entry : counters.entrySet()) {
+      reportCounter(entry.getKey(), entry.getValue());
+    }
+
+    for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
+      reportHistogram(entry.getKey(), entry.getValue());
+    }
+
+    for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+      reportMeter(entry.getKey(), entry.getValue());
+    }
+
+    for (Map.Entry<String, Timer> entry : timers.entrySet()) {
+      reportTimer(entry.getKey(), entry.getValue());
+    }
+  }
+
+  private void reportTimer(String name, Timer timer) {
+    final String group = group(name);
+    try {
+      final Snapshot snapshot = timer.getSnapshot();
+
+      announce(prefix(name, "max"), group, convertDuration(snapshot.getMax()), getDurationUnit());
+      announce(prefix(name, "mean"), group, convertDuration(snapshot.getMean()), getDurationUnit());
+      announce(prefix(name, "min"), group, convertDuration(snapshot.getMin()), getDurationUnit());
+      announce(prefix(name, "stddev"), group, convertDuration(snapshot.getStdDev()), getDurationUnit());
+
+      announce(prefix(name, "p50"), group, convertDuration(snapshot.getMedian()), getDurationUnit());
+      announce(prefix(name, "p75"),
+          group,
+          convertDuration(snapshot.get75thPercentile()),
+          getDurationUnit());
+      announce(prefix(name, "p95"),
+          group,
+          convertDuration(snapshot.get95thPercentile()),
+          getDurationUnit());
+      announce(prefix(name, "p98"),
+          group,
+          convertDuration(snapshot.get98thPercentile()),
+          getDurationUnit());
+      announce(prefix(name, "p99"),
+          group,
+          convertDuration(snapshot.get99thPercentile()),
+          getDurationUnit());
+      announce(prefix(name, "p999"),
+          group,
+          convertDuration(snapshot.get999thPercentile()),
+          getDurationUnit());
+
+      reportMetered(name, timer, group, "calls");
+    } catch (GangliaException e) {
+      LOG.warn("Unable to report timer {}", name, e);
+    }
+  }
+
+  private void reportMeter(String name, Meter meter) {
+    final String group = group(name);
+    try {
+      reportMetered(name, meter, group, "events");
+    } catch (GangliaException e) {
+      LOG.warn("Unable to report meter {}", name, e);
+    }
+  }
+
+  private void reportMetered(String name, Metered meter, String group, String eventName) throws GangliaException {
+    final String unit = eventName + '/' + getRateUnit();
+    announce(prefix(name, "count"), group, meter.getCount(), eventName);
+    announce(prefix(name, "m1_rate"), group, convertRate(meter.getOneMinuteRate()), unit);
+    announce(prefix(name, "m5_rate"), group, convertRate(meter.getFiveMinuteRate()), unit);
+    announce(prefix(name, "m15_rate"), group, convertRate(meter.getFifteenMinuteRate()), unit);
+    announce(prefix(name, "mean_rate"), group, convertRate(meter.getMeanRate()), unit);
+  }
+
+  private void reportHistogram(String name, Histogram histogram) {
+    final String group = group(name);
+    try {
+      final Snapshot snapshot = histogram.getSnapshot();
+
+      announce(prefix(name, "count"), group, histogram.getCount(), "");
+      announce(prefix(name, "max"), group, snapshot.getMax(), "");
+      announce(prefix(name, "mean"), group, snapshot.getMean(), "");
+      announce(prefix(name, "min"), group, snapshot.getMin(), "");
+      announce(prefix(name, "stddev"), group, snapshot.getStdDev(), "");
+      announce(prefix(name, "p50"), group, snapshot.getMedian(), "");
+      announce(prefix(name, "p75"), group, snapshot.get75thPercentile(), "");
+      announce(prefix(name, "p95"), group, snapshot.get95thPercentile(), "");
+      announce(prefix(name, "p98"), group, snapshot.get98thPercentile(), "");
+      announce(prefix(name, "p99"), group, snapshot.get99thPercentile(), "");
+      announce(prefix(name, "p999"), group, snapshot.get999thPercentile(), "");
+    } catch (GangliaException e) {
+      LOG.warn("Unable to report histogram {}", name, e);
+    }
+  }
+
+  private void reportCounter(String name, Counter counter) {
+    final String group = group(name);
+    try {
+      announce(prefix(name, "count"), group, counter.getCount(), "");
+    } catch (GangliaException e) {
+      LOG.warn("Unable to report counter {}", name, e);
+    }
+  }
+
+  private void reportGauge(String name, Gauge gauge) {
+    final String group = group(name);
+    final Object obj = gauge.getValue();
+
+    try {
+      ganglia.announce(name(prefix, name), String.valueOf(obj), detectType(obj), "",
+          GMetricSlope.BOTH, tMax, dMax, group);
+    } catch (GangliaException e) {
+      LOG.warn("Unable to report gauge {}", name, e);
+    }
+  }
+
+  private void announce(String name, String group, double value, String units) throws GangliaException {
+    ganglia.announce(name,
+        Double.toString(value),
+        GMetricType.DOUBLE,
+        units,
+        GMetricSlope.BOTH,
+        tMax,
+        dMax,
+        group);
+  }
+
+  private void announce(String name, String group, long value, String units) throws GangliaException {
+    final String v = Long.toString(value);
+    ganglia.announce(name,
+        v,
+        GMetricType.DOUBLE,
+        units,
+        GMetricSlope.BOTH,
+        tMax,
+        dMax,
+        group);
+  }
+
+  private GMetricType detectType(Object o) {
+    if (o instanceof Float) {
+      return GMetricType.FLOAT;
+    } else if (o instanceof Double) {
+      return GMetricType.DOUBLE;
+    } else if (o instanceof Byte) {
+      return GMetricType.INT8;
+    } else if (o instanceof Short) {
+      return GMetricType.INT16;
+    } else if (o instanceof Integer) {
+      return GMetricType.INT32;
+    } else if (o instanceof Long) {
+      return GMetricType.DOUBLE;
+    }
+    return GMetricType.STRING;
+  }
+
+  private String group(String name) {
+    String[] tokens = name.split("\\.");
+    if(tokens.length < 3) {
+      return "";
+    }
+    return tokens[0] + "." + tokens[1];
+  }
+
+  private String prefix(String name, String n) {
+    return name(prefix, name, n);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
new file mode 100644
index 0000000..80b77f1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+
+public class MetricsConsoleReporter extends TajoMetricsReporter {
+  @Override
+  public void report(SortedMap<String, Gauge> gauges,
+                     SortedMap<String, Counter> counters,
+                     SortedMap<String, Histogram> histograms,
+                     SortedMap<String, Meter> meters,
+                     SortedMap<String, Timer> timers) {
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    final String dateTime = dateFormat.format(new Date());
+    double rateFactor = TimeUnit.SECONDS.toSeconds(1);
+
+    if (!gauges.isEmpty()) {
+      Map<String, Map<String, Gauge>> gaugeGroups = findMetricsItemGroup(gauges);
+
+      for(Map.Entry<String, Map<String, Gauge>> eachGroup: gaugeGroups.entrySet()) {
+        System.out.println(gaugeGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+      }
+    }
+
+    if (!counters.isEmpty()) {
+      Map<String, Map<String, Counter>> counterGroups = findMetricsItemGroup(counters);
+
+      for(Map.Entry<String, Map<String, Counter>> eachGroup: counterGroups.entrySet()) {
+        System.out.println(counterGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+      }
+    }
+
+    if (!histograms.isEmpty()) {
+      Map<String, Map<String, Histogram>> histogramGroups = findMetricsItemGroup(histograms);
+
+      for(Map.Entry<String, Map<String, Histogram>> eachGroup: histogramGroups.entrySet()) {
+        System.out.println(histogramGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+      }
+    }
+
+    if (!meters.isEmpty()) {
+      Map<String, Map<String, Meter>> meterGroups = findMetricsItemGroup(meters);
+
+      for(Map.Entry<String, Map<String, Meter>> eachGroup: meterGroups.entrySet()) {
+        System.out.println(meterGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+      }
+    }
+
+    if (!timers.isEmpty()) {
+      Map<String, Map<String, Timer>> timerGroups = findMetricsItemGroup(timers);
+
+      for(Map.Entry<String, Map<String, Timer>> eachGroup: timerGroups.entrySet()) {
+        System.out.println(timerGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
new file mode 100644
index 0000000..286ef8d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+public class MetricsConsoleScheduledReporter extends MetricsStreamScheduledReporter {
+  public static final String REPORTER_NAME = "console";
+  @Override
+  protected String getReporterName() {
+    return REPORTER_NAME;
+  }
+
+  @Override
+  protected void afterInit() {
+    setOutput(System.out);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
new file mode 100644
index 0000000..9e895b8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+
+public class MetricsFileScheduledReporter extends MetricsStreamScheduledReporter {
+  private static final Log LOG = LogFactory.getLog(MetricsFileScheduledReporter.class);
+  public static final String REPORTER_NAME = "file";
+
+  protected String getReporterName() {
+    return REPORTER_NAME;
+  }
+
+  @Override
+  protected void afterInit() {
+    String fileName = metricsProperties.get(metricsPropertyKey + "filename");
+    if(fileName == null) {
+      LOG.warn("No " + metricsPropertyKey + "filename property in tajo-metrics.properties");
+      return;
+    }
+    try {
+      File file = new File(fileName);
+      File parentFile = file.getParentFile();
+      if(parentFile != null && !parentFile.exists()) {
+        if(!parentFile.mkdirs()) {
+          LOG.warn("Can't create dir for tajo metrics:" + parentFile.getAbsolutePath());
+        }
+      }
+      this.setOutput(new FileOutputStream(fileName, true));
+      this.setDateFormat(null);
+    } catch (FileNotFoundException e) {
+      LOG.warn("Can't open metrics file:" + fileName);
+      this.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
new file mode 100644
index 0000000..4fbefd7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public abstract class MetricsStreamScheduledReporter extends TajoMetricsScheduledReporter {
+  private static final Log LOG = LogFactory.getLog(MetricsStreamScheduledReporter.class);
+
+  protected OutputStream output;
+  protected Locale locale;
+  protected Clock clock;
+  protected TimeZone timeZone;
+  protected MetricFilter filter;
+  protected DateFormat dateFormat;
+
+  private final byte[] NEW_LINE = "\n".getBytes();
+
+  public MetricsStreamScheduledReporter() {
+    dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    clock = Clock.defaultClock();
+  }
+
+  public void setOutput(OutputStream output) {
+    this.output = output;
+  }
+
+  public void setLocale(Locale locale) {
+    this.locale = locale;
+  }
+
+  public void setClock(Clock clock) {
+    this.clock = clock;
+  }
+
+  public void setTimeZone(TimeZone timeZone) {
+    this.dateFormat.setTimeZone(timeZone);
+    this.timeZone = timeZone;
+  }
+
+  public void setDateFormat(DateFormat dateFormat) {
+    this.dateFormat = dateFormat;
+  }
+
+  @Override
+  public void report(SortedMap<String, Gauge> gauges,
+                     SortedMap<String, Counter> counters,
+                     SortedMap<String, Histogram> histograms,
+                     SortedMap<String, Meter> meters,
+                     SortedMap<String, Timer> timers) {
+    final String dateTime = dateFormat == null ? "" + clock.getTime() : dateFormat.format(new Date(clock.getTime()));
+
+    if (!gauges.isEmpty()) {
+      Map<String, Map<String, Gauge>> gaugeGroups = findMetricsItemGroup(gauges);
+
+      for(Map.Entry<String, Map<String, Gauge>> eachGroup: gaugeGroups.entrySet()) {
+        printGaugeGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+      }
+    }
+
+    if (!counters.isEmpty()) {
+      Map<String, Map<String, Counter>> counterGroups = findMetricsItemGroup(counters);
+
+      for(Map.Entry<String, Map<String, Counter>> eachGroup: counterGroups.entrySet()) {
+        printCounterGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+      }
+    }
+
+    if (!histograms.isEmpty()) {
+      Map<String, Map<String, Histogram>> histogramGroups = findMetricsItemGroup(histograms);
+
+      for(Map.Entry<String, Map<String, Histogram>> eachGroup: histogramGroups.entrySet()) {
+        printHistogramGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+      }
+    }
+
+    if (!meters.isEmpty()) {
+      Map<String, Map<String, Meter>> meterGroups = findMetricsItemGroup(meters);
+
+      for(Map.Entry<String, Map<String, Meter>> eachGroup: meterGroups.entrySet()) {
+        printMeterGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+      }
+    }
+
+    if (!timers.isEmpty()) {
+      Map<String, Map<String, Timer>> timerGroups = findMetricsItemGroup(timers);
+
+      for(Map.Entry<String, Map<String, Timer>> eachGroup: timerGroups.entrySet()) {
+        printTimerGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+      }
+    }
+    try {
+      output.flush();
+    } catch (IOException e) {
+    }
+  }
+
+  private void printMeterGroup(String dateTime, String groupName, Map<String, Meter> meters) {
+    try {
+      output.write(meterGroupToString(dateTime, hostAndPort, rateFactor, groupName, meters).getBytes());
+      output.write(NEW_LINE);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  private void printCounterGroup(String dateTime, String groupName, Map<String, Counter> counters) {
+    try {
+      output.write(counterGroupToString(dateTime, hostAndPort, rateFactor, groupName, counters).getBytes());
+      output.write(NEW_LINE);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  private void printGaugeGroup(String dateTime, String groupName, Map<String, Gauge> gauges) {
+    try {
+      output.write(gaugeGroupToString(dateTime, hostAndPort, rateFactor, groupName, gauges).getBytes());
+      output.write(NEW_LINE);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  private void printHistogramGroup(String dateTime, String groupName, Map<String, Histogram> histograms) {
+    try {
+      output.write(histogramGroupToString(dateTime, hostAndPort, rateFactor, groupName, histograms).getBytes());
+      output.write(NEW_LINE);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  private void printTimerGroup(String dateTime, String groupName, Map<String, Timer> timers) {
+    try {
+      output.write(timerGroupToString(dateTime, hostAndPort, rateFactor, groupName, timers).getBytes());
+      output.write(NEW_LINE);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void close() {
+    if(output != null) {
+      try {
+        output.close();
+      } catch (IOException e) {
+      }
+    }
+
+    super.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
new file mode 100644
index 0000000..9dc1755
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.util.SortedMap;
+
+public class NullReporter extends TajoMetricsReporter {
+  @Override
+  public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters,
+                     SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters,
+                     SortedMap<String, Timer> timers) {
+  }
+}