You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/28 07:36:09 UTC

[12/50] [abbrv] git commit: TAJO-333: Add metric system to Tajo. (hyoungjunkim via jihoon)

TAJO-333: Add metric system to Tajo. (hyoungjunkim via jihoon)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/62c49c05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/62c49c05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/62c49c05

Branch: refs/heads/DAG-execplan
Commit: 62c49c05f522158d75d818df49100a0b1fd354bb
Parents: 1d0d458
Author: Jihoon Son <ji...@apache.org>
Authored: Sat Dec 14 12:32:41 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Sat Dec 14 12:33:40 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   5 +-
 tajo-core/tajo-core-backend/pom.xml             |  16 ++
 .../src/main/java/log4j.properties              |   6 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   7 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  26 +-
 .../master/metrics/CatalogMetricsGaugeSet.java  |  54 ++++
 .../metrics/WorkerResourceMetricsGaugeSet.java  |  74 ++++++
 .../querymaster/QueryMasterManagerService.java  |   3 +
 .../master/querymaster/QueryMasterTask.java     |  13 +
 .../util/metrics/GroupNameMetricsFilter.java    |  43 ++++
 .../tajo/util/metrics/LogEventGaugeSet.java     |  64 +++++
 .../tajo/util/metrics/MetricsFilterList.java    |  43 ++++
 .../tajo/util/metrics/RegexpMetricsFilter.java  |  51 ++++
 .../tajo/util/metrics/TajoLogEventCounter.java  |  86 +++++++
 .../apache/tajo/util/metrics/TajoMetrics.java   | 133 ++++++++++
 .../tajo/util/metrics/TajoSystemMetrics.java    | 213 +++++++++++++++
 .../util/metrics/reporter/GangliaReporter.java  | 258 +++++++++++++++++++
 .../reporter/MetricsConsoleReporter.java        |  80 ++++++
 .../MetricsConsoleScheduledReporter.java        |  32 +++
 .../reporter/MetricsFileScheduledReporter.java  |  57 ++++
 .../MetricsStreamScheduledReporter.java         | 179 +++++++++++++
 .../util/metrics/reporter/NullReporter.java     |  31 +++
 .../metrics/reporter/TajoMetricsReporter.java   | 232 +++++++++++++++++
 .../reporter/TajoMetricsScheduledReporter.java  | 206 +++++++++++++++
 .../java/org/apache/tajo/worker/TajoWorker.java |  40 +++
 .../tajo/worker/TajoWorkerManagerService.java   |   1 +
 .../java/org/apache/tajo/worker/TaskRunner.java |   2 +-
 .../apache/tajo/worker/TaskRunnerManager.java   |   4 +
 .../src/main/resources/tajo-metrics.properties  |  75 ++++++
 .../tajo/util/metrics/TestMetricsFilter.java    |  52 ++++
 .../tajo/util/metrics/TestSystemMetrics.java    | 133 ++++++++++
 32 files changed, 2215 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 452cace..2f7d1db 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Release 0.8.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-333: Add metric system to Tajo. (hyoungjunkim via jihoon)
+
     TAJO-413: Implement pi function. (DaeMyung Kang via jihoon)
 
     TAJO-61: Implement Time Datum Type. (DaeMyung Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 0d9bbb0..fb1c29b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -195,8 +195,11 @@ public class TajoConf extends YarnConfiguration {
     //////////////////////////////////
     // Task Configuration
     TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512),
-    TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f)
+    TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f),
     //////////////////////////////////
+
+    // Metrics
+    METRICS_PROPERTY_FILENAME("tajo.metrics.property.file", "tajo-metrics.properties")
     ;
 
     public final String varname;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index fca3372..07117a6 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -35,6 +35,7 @@
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <antlr4.visitor>true</antlr4.visitor>
     <antlr4.listener>true</antlr4.listener>
+    <metrics.version>3.0.1</metrics.version>
   </properties>
 
   <repositories>
@@ -379,6 +380,21 @@
       <version>6.1.14</version>
     </dependency>
 
+    <dependency>
+      <groupId>com.codahale.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>${metrics.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.codahale.metrics</groupId>
+      <artifactId>metrics-jvm</artifactId>
+      <version>${metrics.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>info.ganglia.gmetric4j</groupId>
+      <artifactId>gmetric4j</artifactId>
+      <version>1.0.3</version>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/log4j.properties
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/log4j.properties b/tajo-core/tajo-core-backend/src/main/java/log4j.properties
index 749124c..15e5778 100644
--- a/tajo-core/tajo-core-backend/src/main/java/log4j.properties
+++ b/tajo-core/tajo-core-backend/src/main/java/log4j.properties
@@ -18,11 +18,13 @@
 
 # log4j configuration used during build and unit tests
 
-log4j.rootLogger=info,stdout
+log4j.rootLogger=info,stdout,EventCounter
 log4j.threshhold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
 
 log4j.logger.org.apache.hadoop=WARN
-log4j.logger.org.apache.hadoop.conf=ERROR
\ No newline at end of file
+log4j.logger.org.apache.hadoop.conf=ERROR
+
+log4j.appender.EventCounter=org.apache.tajo.util.metrics.TajoLogEventCounter
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 2412637..10f42c5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -92,7 +92,6 @@ public class GlobalEngine extends AbstractService {
       hookManager = new DistributedQueryHookManager();
       hookManager.addHook(new CreateTableHook());
       hookManager.addHook(new InsertHook());
-
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
     }
@@ -128,9 +127,12 @@ public class GlobalEngine extends AbstractService {
       LOG.info("hive.query.mode:" + hiveQueryMode);
 
       if (hiveQueryMode) {
+        context.getSystemMetrics().counter("Query", "numHiveMode").inc();
         queryContext.setHiveQueryMode();
       }
 
+      context.getSystemMetrics().counter("Query", "totalQuery").inc();
+
       Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
 
       LogicalPlan plan = createLogicalPlan(planningContext);
@@ -138,11 +140,13 @@ public class GlobalEngine extends AbstractService {
 
       GetQueryStatusResponse.Builder responseBuilder = GetQueryStatusResponse.newBuilder();
       if (PlannerUtil.checkIfDDLPlan(rootNode)) {
+        context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
         updateQuery(rootNode.getChild());
         responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
         responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
         responseBuilder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
       } else {
+        context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
         hookManager.doHooks(queryContext, plan);
 
         QueryJobManager queryJobManager = this.context.getQueryJobManager();
@@ -169,6 +173,7 @@ public class GlobalEngine extends AbstractService {
 
       return response;
     } catch (Throwable t) {
+      context.getSystemMetrics().counter("Query", "errorQuery").inc();
       LOG.error("\nStack Trace:\n" + StringUtils.stringifyException(t));
       GetQueryStatusResponse.Builder responseBuilder = GetQueryStatusResponse.newBuilder();
       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index f98cecb..e3d4c01 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -50,6 +50,8 @@ import org.apache.tajo.engine.function.datetime.ToCharTimestamp;
 import org.apache.tajo.engine.function.datetime.ToTimestamp;
 import org.apache.tajo.engine.function.math.*;
 import org.apache.tajo.engine.function.string.*;
+import org.apache.tajo.master.metrics.CatalogMetricsGaugeSet;
+import org.apache.tajo.master.metrics.WorkerResourceMetricsGaugeSet;
 import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.rm.WorkerResourceManager;
@@ -57,6 +59,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
 import org.apache.tajo.webapp.QueryExecutorServlet;
 import org.apache.tajo.webapp.StaticHttpServer;
 
@@ -70,6 +73,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class TajoMaster extends CompositeService {
+  private static final String METRICS_GROUP_NAME = "tajomaster";
 
   /** Class Logger */
   private static final Log LOG = LogFactory.getLog(TajoMaster.class);
@@ -119,6 +123,8 @@ public class TajoMaster extends CompositeService {
 
   private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
 
+  private TajoSystemMetrics systemMetrics;
+
   public TajoMaster() throws Exception {
     super(TajoMaster.class.getName());
   }
@@ -180,6 +186,14 @@ public class TajoMaster extends CompositeService {
     LOG.info("Tajo Master is initialized.");
   }
 
+  private void initSystemMetrics() {
+    systemMetrics = new TajoSystemMetrics(systemConf, METRICS_GROUP_NAME, getMasterName());
+    systemMetrics.start();
+
+    systemMetrics.register("resource", new WorkerResourceMetricsGaugeSet(context));
+    systemMetrics.register("catalog", new CatalogMetricsGaugeSet(context));
+  }
+
   private void initResourceManager() throws Exception {
     Class<WorkerResourceManager>  resourceManagerClass = (Class<WorkerResourceManager>)
         systemConf.getClass(ConfVars.RESOURCE_MANAGER_CLASS.varname, TajoWorkerResourceManager.class);
@@ -826,8 +840,10 @@ public class TajoMaster extends CompositeService {
     try {
       writeSystemConf();
     } catch (IOException e) {
-      e.printStackTrace();
+      LOG.error(e.getMessage(), e);
     }
+
+    initSystemMetrics();
   }
 
   private void writeSystemConf() throws IOException {
@@ -862,6 +878,10 @@ public class TajoMaster extends CompositeService {
       }
     }
 
+    if(systemMetrics != null) {
+      systemMetrics.stop();
+    }
+
     super.stop();
     LOG.info("Tajo Master main thread exiting");
   }
@@ -928,6 +948,10 @@ public class TajoMaster extends CompositeService {
     public TajoMasterService getTajoMasterService() {
       return tajoMasterService;
     }
+
+    public TajoSystemMetrics getSystemMetrics() {
+      return systemMetrics;
+    }
   }
 
   String getThreadTaskName(long id, String name) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
new file mode 100644
index 0000000..08fff53
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import org.apache.tajo.master.TajoMaster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CatalogMetricsGaugeSet implements MetricSet {
+  TajoMaster.MasterContext tajoMasterContext;
+  public CatalogMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
+    this.tajoMasterContext = tajoMasterContext;
+  }
+
+  @Override
+  public Map<String, Metric> getMetrics() {
+    Map<String, Metric> metricsMap = new HashMap<String, Metric>();
+    metricsMap.put("numTables", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return tajoMasterContext.getCatalog().getAllTableNames().size();
+      }
+    });
+
+    metricsMap.put("numFunctions", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return tajoMasterContext.getCatalog().getFunctions().size();
+      }
+    });
+
+    return metricsMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
new file mode 100644
index 0000000..1924041
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.WorkerStatus;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WorkerResourceMetricsGaugeSet implements MetricSet {
+  TajoMaster.MasterContext tajoMasterContext;
+  public WorkerResourceMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
+    this.tajoMasterContext = tajoMasterContext;
+  }
+
+  @Override
+  public Map<String, Metric> getMetrics() {
+    Map<String, Metric> metricsMap = new HashMap<String, Metric>();
+    metricsMap.put("totalWorkers", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return tajoMasterContext.getResourceManager().getWorkers().size();
+      }
+    });
+
+    metricsMap.put("liveWorkers", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return getNumWorkers(WorkerStatus.LIVE);
+      }
+    });
+
+    metricsMap.put("deadWorkers", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return getNumWorkers(WorkerStatus.DEAD);
+      }
+    });
+
+    return metricsMap;
+  }
+
+  protected int getNumWorkers(WorkerStatus status) {
+    int numWorkers = 0;
+    for(WorkerResource eachWorker: tajoMasterContext.getResourceManager().getWorkers().values()) {
+      if(eachWorker.getWorkerStatus() == status) {
+        numWorkers++;
+      }
+    }
+
+    return numWorkers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 4f0e128..78c417e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -194,6 +194,8 @@ public class QueryMasterManagerService extends CompositeService
                            TajoWorkerProtocol.QueryExecutionRequestProto request,
                            RpcCallback<PrimitiveProtos.BoolProto> done) {
     try {
+      workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
+
       QueryId queryId = new QueryId(request.getQueryId());
       LOG.info("Receive executeQuery request:" + queryId);
       queryMaster.handle(new QueryStartEvent(queryId,
@@ -201,6 +203,7 @@ public class QueryMasterManagerService extends CompositeService
           request.getLogicalPlanJson().getValue()));
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
+      workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc();
       LOG.error(e.getMessage(), e);
       done.run(TajoWorker.FALSE_PROTO);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 8cd7d45..828ebfa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -51,6 +51,8 @@ import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.util.metrics.TajoMetrics;
+import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
 import org.apache.tajo.worker.AbstractResourceAllocator;
 import org.apache.tajo.worker.TajoResourceAllocator;
 import org.apache.tajo.worker.YarnResourceAllocator;
@@ -99,6 +101,8 @@ public class QueryMasterTask extends CompositeService {
 
   private AtomicBoolean stopped = new AtomicBoolean(false);
 
+  private TajoMetrics queryMetrics;
+
   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
                          QueryId queryId, QueryContext queryContext, String sql, String logicalPlanJson) {
     super(QueryMasterTask.class.getName());
@@ -136,6 +140,8 @@ public class QueryMasterTask extends CompositeService {
 
       initStagingDir();
 
+      queryMetrics = new TajoMetrics(queryId.toString());
+
       super.init(systemConf);
     } catch (IOException e) {
       LOG.error(e.getMessage(), e);
@@ -186,6 +192,9 @@ public class QueryMasterTask extends CompositeService {
 
     super.stop();
 
+    //TODO change report to tajo master
+    queryMetrics.report(new MetricsConsoleReporter());
+
     LOG.info("Stopped QueryMasterTask:" + queryId);
   }
 
@@ -493,6 +502,10 @@ public class QueryMasterTask extends CompositeService {
     public AbstractResourceAllocator getResourceAllocator() {
       return resourceAllocator;
     }
+
+    public TajoMetrics getQueryMetrics() {
+      return queryMetrics;
+    }
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
new file mode 100644
index 0000000..a273475
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+public class GroupNameMetricsFilter implements MetricFilter {
+  String groupName;
+
+  public GroupNameMetricsFilter(String groupName) {
+    this.groupName = groupName;
+  }
+  @Override
+  public boolean matches(String name, Metric metric) {
+    if(name != null) {
+      String[] tokens = name.split("\\.");
+      if(groupName.equals(tokens[0])) {
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
new file mode 100644
index 0000000..6e130ff
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LogEventGaugeSet implements MetricSet {
+
+  @Override
+  public Map<String, Metric> getMetrics() {
+    final Map<String, Metric> gauges = new HashMap<String, Metric>();
+
+    gauges.put("Fatal", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return TajoLogEventCounter.getFatal();
+      }
+    });
+
+    gauges.put("Error", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return TajoLogEventCounter.getError();
+      }
+    });
+
+    gauges.put("Warn", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return TajoLogEventCounter.getWarn();
+      }
+    });
+
+    gauges.put("Info", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return TajoLogEventCounter.getInfo();
+      }
+    });
+
+    return gauges;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
new file mode 100644
index 0000000..b2fc6e4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MetricsFilterList implements MetricFilter {
+  List<MetricFilter> filters = new ArrayList<MetricFilter>();
+
+  public void addMetricFilter(MetricFilter filter) {
+    filters.add(filter);
+  }
+
+  @Override
+  public boolean matches(String name, Metric metric) {
+    for (MetricFilter eachFilter: filters) {
+      if (!eachFilter.matches(name, metric)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
new file mode 100644
index 0000000..4faa3e7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class RegexpMetricsFilter implements MetricFilter {
+  List<Pattern> filterPatterns = new ArrayList<Pattern>();
+
+  public RegexpMetricsFilter(Collection<String> filterExpressions) {
+    for(String eachExpression: filterExpressions) {
+      filterPatterns.add(Pattern.compile(eachExpression));
+    }
+  }
+
+  @Override
+  public boolean matches(String name, Metric metric) {
+    if(filterPatterns.isEmpty()) {
+      return true;
+    }
+
+    for(Pattern eachPattern: filterPatterns) {
+      if(eachPattern.matcher(name).find()) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
new file mode 100644
index 0000000..3e44b02
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class TajoLogEventCounter extends AppenderSkeleton {
+  private static final int FATAL = 0;
+  private static final int ERROR = 1;
+  private static final int WARN = 2;
+  private static final int INFO = 3;
+
+  private static class EventCounts {
+
+    private final long[] counts = {0, 0, 0, 0};
+
+    private synchronized void incr(int i) {
+      ++counts[i];
+    }
+
+    private synchronized long get(int i) {
+      return counts[i];
+    }
+  }
+
+  private static EventCounts counts = new EventCounts();
+
+  public static long getFatal() {
+    return counts.get(FATAL);
+  }
+
+  public static long getError() {
+    return counts.get(ERROR);
+  }
+
+  public static long getWarn() {
+    return counts.get(WARN);
+  }
+
+  public static long getInfo() {
+    return counts.get(INFO);
+  }
+
+  @Override
+  public void append(LoggingEvent event) {
+    Level level = event.getLevel();
+    String levelStr = level.toString();
+
+    if (level == Level.INFO || "INFO".equalsIgnoreCase(levelStr)) {
+      counts.incr(INFO);
+    } else if (level == Level.WARN || "WARN".equalsIgnoreCase(levelStr)) {
+      counts.incr(WARN);
+    } else if (level == Level.ERROR || "ERROR".equalsIgnoreCase(levelStr)) {
+      counts.incr(ERROR);
+    } else if (level == Level.FATAL || "FATAL".equalsIgnoreCase(levelStr)) {
+      counts.incr(FATAL);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public boolean requiresLayout() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
new file mode 100644
index 0000000..0e378b2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsReporter;
+
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TajoMetrics {
+  private static final Log LOG = LogFactory.getLog(TajoMetrics.class);
+
+  protected MetricRegistry metricRegistry;
+  protected AtomicBoolean stop = new AtomicBoolean(false);
+  protected String metricsGroupName;
+
+  public TajoMetrics(String metricsGroupName) {
+    this.metricsGroupName = metricsGroupName;
+    this.metricRegistry = new MetricRegistry();
+  }
+
+  public void stop() {
+    stop.set(true);
+  }
+
+  public MetricRegistry getRegistry() {
+    return metricRegistry;
+  }
+
+  public void report(TajoMetricsReporter reporter) {
+    try {
+      reporter.report(metricRegistry.getGauges(),
+          metricRegistry.getCounters(),
+          metricRegistry.getHistograms(),
+          metricRegistry.getMeters(),
+          metricRegistry.getTimers());
+    } catch (Exception e) {
+      if(LOG.isDebugEnabled()) {
+        LOG.warn("Metric report error:" + e.getMessage(), e);
+      } else {
+        LOG.warn("Metric report error:" + e.getMessage());
+      }
+    }
+  }
+
+  public Map<String, Metric> getMetrics() {
+    return metricRegistry.getMetrics();
+  }
+
+  public SortedMap<String, Gauge> getGuageMetrics(MetricFilter filter) {
+    if(filter == null) {
+      filter = MetricFilter.ALL;
+    }
+    return metricRegistry.getGauges(filter);
+  }
+
+  public SortedMap<String, Counter> getCounterMetrics(MetricFilter filter) {
+    if(filter == null) {
+      filter = MetricFilter.ALL;
+    }
+    return metricRegistry.getCounters(filter);
+  }
+
+  public SortedMap<String, Histogram> getHistogramMetrics(MetricFilter filter) {
+    if(filter == null) {
+      filter = MetricFilter.ALL;
+    }
+    return metricRegistry.getHistograms(filter);
+  }
+
+  public SortedMap<String, Meter> getMeterMetrics(MetricFilter filter) {
+    if(filter == null) {
+      filter = MetricFilter.ALL;
+    }
+    return metricRegistry.getMeters(filter);
+  }
+
+  public SortedMap<String, Timer> getTimerMetrics(MetricFilter filter) {
+    if(filter == null) {
+      filter = MetricFilter.ALL;
+    }
+    return metricRegistry.getTimers(filter);
+  }
+
+  public void register(String contextName, MetricSet metricSet) {
+    metricRegistry.register(MetricRegistry.name(metricsGroupName, contextName), metricSet);
+  }
+
+  public void register(String contextName, String itemName, Gauge gauge) {
+    metricRegistry.register(makeMetricsName(metricsGroupName, contextName, itemName), gauge);
+  }
+
+  public Counter counter(String contextName, String itemName) {
+    return metricRegistry.counter(makeMetricsName(metricsGroupName, contextName, itemName));
+  }
+
+  public Histogram histogram(String contextName, String itemName) {
+    return metricRegistry.histogram(makeMetricsName(metricsGroupName, contextName, itemName));
+  }
+
+  public Meter meter(String contextName, String itemName) {
+    return metricRegistry.meter(makeMetricsName(metricsGroupName, contextName, itemName));
+  }
+
+  public Timer timer(String contextName, String itemName) {
+    return metricRegistry.timer(makeMetricsName(metricsGroupName, contextName, itemName));
+  }
+
+  public static String makeMetricsName(String metricsGroupName, String contextName, String itemName) {
+    return MetricRegistry.name(metricsGroupName, contextName, itemName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
new file mode 100644
index 0000000..4192ca0
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.event.ConfigurationEvent;
+import org.apache.commons.configuration.event.ConfigurationListener;
+import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsScheduledReporter;
+
+import java.util.*;
+
+public class TajoSystemMetrics extends TajoMetrics {
+  private static final Log LOG = LogFactory.getLog(TajoSystemMetrics.class);
+
+  private PropertiesConfiguration metricsProps;
+
+  private Thread propertyChangeChecker;
+
+  private String hostAndPort;
+
+  private List<TajoMetricsScheduledReporter> metricsReporters = new ArrayList<TajoMetricsScheduledReporter>();
+
+  private boolean inited = false;
+
+  private String metricsPropertyFileName;
+
+  public TajoSystemMetrics(TajoConf tajoConf, String metricsGroupName, String hostAndPort) {
+    super(metricsGroupName);
+
+    this.hostAndPort = hostAndPort;
+    try {
+      this.metricsPropertyFileName = tajoConf.getVar(TajoConf.ConfVars.METRICS_PROPERTY_FILENAME);
+      this.metricsProps = new PropertiesConfiguration(metricsPropertyFileName);
+      this.metricsProps.addConfigurationListener(new MetricsReloadListener());
+      FileChangedReloadingStrategy reloadingStrategy = new FileChangedReloadingStrategy();
+      reloadingStrategy.setRefreshDelay(5 * 1000);
+      this.metricsProps.setReloadingStrategy(reloadingStrategy);
+    } catch (ConfigurationException e) {
+      LOG.warn(e.getMessage(), e);
+    }
+
+    //PropertiesConfiguration fire configurationChanged after getXXX()
+    //So neeaded calling getXXX periodically
+    propertyChangeChecker = new Thread() {
+      public void run() {
+        while(!stop.get()) {
+          String value = metricsProps.getString("reporter.file");
+          try {
+            Thread.sleep(10 * 1000);
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+
+    propertyChangeChecker.start();
+  }
+
+  public Collection<TajoMetricsScheduledReporter> getMetricsReporters() {
+    synchronized (metricsReporters) {
+      return Collections.unmodifiableCollection(metricsReporters);
+    }
+  }
+
+  @Override
+  public void stop() {
+    super.stop();
+    if(propertyChangeChecker != null) {
+      propertyChangeChecker.interrupt();
+    }
+    stopAndClearReporter();
+  }
+
+  protected void stopAndClearReporter() {
+    synchronized(metricsReporters) {
+      for(TajoMetricsScheduledReporter eachReporter: metricsReporters) {
+        eachReporter.close();
+      }
+
+      metricsReporters.clear();
+    }
+  }
+
+  public void start() {
+    setMetricsReporter(metricsGroupName);
+
+    String jvmMetricsName = metricsGroupName + "-jvm";
+    setMetricsReporter(jvmMetricsName);
+
+    if(!inited) {
+      metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Heap"), new MemoryUsageGaugeSet());
+      metricRegistry.register(MetricRegistry.name(jvmMetricsName, "File"), new FileDescriptorRatioGauge());
+      metricRegistry.register(MetricRegistry.name(jvmMetricsName, "GC"), new GarbageCollectorMetricSet());
+      metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Thread"), new ThreadStatesGaugeSet());
+      metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Log"), new LogEventGaugeSet());
+    }
+    inited = true;
+  }
+
+  private void setMetricsReporter(String groupName) {
+    //reporter name -> class name
+    Map<String, String> reporters = new HashMap<String, String>();
+
+    List<String> reporterNames = metricsProps.getList(groupName + ".reporters");
+    if(reporterNames.isEmpty()) {
+      LOG.warn("No property " + groupName + ".reporters in " + metricsPropertyFileName);
+      return;
+    }
+
+    Map<String, String> allReporterProperties = new HashMap<String, String>();
+
+    Iterator<String> keys = metricsProps.getKeys();
+    while (keys.hasNext()) {
+      String key = keys.next();
+      String value = metricsProps.getString(key);
+      if(key.indexOf("reporter.") == 0) {
+        String[] tokens = key.split("\\.");
+        if(tokens.length == 2) {
+          reporters.put(tokens[1], value);
+        }
+      } else if(key.indexOf(groupName + ".") == 0) {
+        String[] tokens = key.split("\\.");
+        if(tokens.length > 2) {
+          allReporterProperties.put(key, value);
+        }
+      }
+    }
+
+    synchronized(metricsReporters) {
+      for(String eachReporterName: reporterNames) {
+        if("null".equals(eachReporterName)) {
+          continue;
+        }
+        String reporterClass = reporters.get(eachReporterName);
+        if(reporterClass == null) {
+          LOG.warn("No metrics reporter definition[" + eachReporterName + "] in " + metricsPropertyFileName);
+          continue;
+        }
+
+        Map<String, String> eachMetricsReporterProperties = findMetircsProperties(allReporterProperties,
+            groupName + "." + eachReporterName);
+
+        try {
+          Object reporterObject = Class.forName(reporterClass).newInstance();
+          if(!(reporterObject instanceof TajoMetricsScheduledReporter)) {
+            LOG.warn(reporterClass + " is not subclass of " + TajoMetricsScheduledReporter.class.getCanonicalName());
+            continue;
+          }
+          TajoMetricsScheduledReporter reporter = (TajoMetricsScheduledReporter)reporterObject;
+          reporter.init(metricRegistry, groupName, hostAndPort, eachMetricsReporterProperties);
+          reporter.start();
+
+          metricsReporters.add(reporter);
+          LOG.info("Started metrics reporter " + reporter.getClass().getCanonicalName() + " for " + groupName);
+        } catch (ClassNotFoundException e) {
+          LOG.warn("No metrics reporter class[" + eachReporterName + "], required class= " + reporterClass);
+          continue;
+        } catch (Exception e) {
+          LOG.warn("Can't initiate metrics reporter class[" + eachReporterName + "]" + e.getMessage() , e);
+          continue;
+        }
+      }
+    }
+  }
+
+  private Map<String, String> findMetircsProperties(Map<String, String> allReporterProperties, String findKey) {
+    Map<String, String> metricsProperties = new HashMap<String, String>();
+
+    for (Map.Entry<String, String> entry: allReporterProperties.entrySet()) {
+      String eachKey = entry.getKey();
+      if (eachKey.indexOf(findKey) == 0) {
+        metricsProperties.put(eachKey, entry.getValue());
+      }
+    }
+    return metricsProperties;
+  }
+
+  class MetricsReloadListener implements ConfigurationListener {
+    @Override
+    public synchronized void configurationChanged(ConfigurationEvent event) {
+      if (!event.isBeforeUpdate()) {
+        stopAndClearReporter();
+        start();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
new file mode 100644
index 0000000..b9acf0e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import info.ganglia.gmetric4j.gmetric.GMetricSlope;
+import info.ganglia.gmetric4j.gmetric.GMetricType;
+import info.ganglia.gmetric4j.gmetric.GangliaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.SortedMap;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+public class GangliaReporter extends TajoMetricsScheduledReporter {
+  private static final Logger LOG = LoggerFactory.getLogger(GangliaReporter.class);
+  public static final String REPORTER_NAME = "ganglia";
+
+  private GMetric ganglia;
+  private String prefix;
+  private int tMax = 60;
+  private int dMax = 0;
+
+  @Override
+  protected String getReporterName() {
+    return REPORTER_NAME;
+  }
+
+  @Override
+  protected void afterInit() {
+    String server = metricsProperties.get(metricsPropertyKey + "server");
+    String port = metricsProperties.get(metricsPropertyKey + "port");
+
+    if(server == null || server.isEmpty()) {
+      LOG.warn("No " + metricsPropertyKey + "server property in tajo-metrics.properties");
+      return;
+    }
+
+    if(port == null || port.isEmpty()) {
+      LOG.warn("No " + metricsPropertyKey + "port property in tajo-metrics.properties");
+      return;
+    }
+
+    try {
+      ganglia = new GMetric(server, Integer.parseInt(port), GMetric.UDPAddressingMode.MULTICAST, 1);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  public void setPrefix(String prefix) {
+    this.prefix = prefix;
+  }
+
+  public void settMax(int tMax) {
+    this.tMax = tMax;
+  }
+
+  public void setdMax(int dMax) {
+    this.dMax = dMax;
+  }
+
+  @Override
+  public void report(SortedMap<String, Gauge> gauges,
+                     SortedMap<String, Counter> counters,
+                     SortedMap<String, Histogram> histograms,
+                     SortedMap<String, Meter> meters,
+                     SortedMap<String, Timer> timers) {
+    for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+      reportGauge(entry.getKey(), entry.getValue());
+    }
+
+    for (Map.Entry<String, Counter> entry : counters.entrySet()) {
+      reportCounter(entry.getKey(), entry.getValue());
+    }
+
+    for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
+      reportHistogram(entry.getKey(), entry.getValue());
+    }
+
+    for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+      reportMeter(entry.getKey(), entry.getValue());
+    }
+
+    for (Map.Entry<String, Timer> entry : timers.entrySet()) {
+      reportTimer(entry.getKey(), entry.getValue());
+    }
+  }
+
+  private void reportTimer(String name, Timer timer) {
+    final String group = group(name);
+    try {
+      final Snapshot snapshot = timer.getSnapshot();
+
+      announce(prefix(name, "max"), group, convertDuration(snapshot.getMax()), getDurationUnit());
+      announce(prefix(name, "mean"), group, convertDuration(snapshot.getMean()), getDurationUnit());
+      announce(prefix(name, "min"), group, convertDuration(snapshot.getMin()), getDurationUnit());
+      announce(prefix(name, "stddev"), group, convertDuration(snapshot.getStdDev()), getDurationUnit());
+
+      announce(prefix(name, "p50"), group, convertDuration(snapshot.getMedian()), getDurationUnit());
+      announce(prefix(name, "p75"),
+          group,
+          convertDuration(snapshot.get75thPercentile()),
+          getDurationUnit());
+      announce(prefix(name, "p95"),
+          group,
+          convertDuration(snapshot.get95thPercentile()),
+          getDurationUnit());
+      announce(prefix(name, "p98"),
+          group,
+          convertDuration(snapshot.get98thPercentile()),
+          getDurationUnit());
+      announce(prefix(name, "p99"),
+          group,
+          convertDuration(snapshot.get99thPercentile()),
+          getDurationUnit());
+      announce(prefix(name, "p999"),
+          group,
+          convertDuration(snapshot.get999thPercentile()),
+          getDurationUnit());
+
+      reportMetered(name, timer, group, "calls");
+    } catch (GangliaException e) {
+      LOG.warn("Unable to report timer {}", name, e);
+    }
+  }
+
+  private void reportMeter(String name, Meter meter) {
+    final String group = group(name);
+    try {
+      reportMetered(name, meter, group, "events");
+    } catch (GangliaException e) {
+      LOG.warn("Unable to report meter {}", name, e);
+    }
+  }
+
+  private void reportMetered(String name, Metered meter, String group, String eventName) throws GangliaException {
+    final String unit = eventName + '/' + getRateUnit();
+    announce(prefix(name, "count"), group, meter.getCount(), eventName);
+    announce(prefix(name, "m1_rate"), group, convertRate(meter.getOneMinuteRate()), unit);
+    announce(prefix(name, "m5_rate"), group, convertRate(meter.getFiveMinuteRate()), unit);
+    announce(prefix(name, "m15_rate"), group, convertRate(meter.getFifteenMinuteRate()), unit);
+    announce(prefix(name, "mean_rate"), group, convertRate(meter.getMeanRate()), unit);
+  }
+
+  private void reportHistogram(String name, Histogram histogram) {
+    final String group = group(name);
+    try {
+      final Snapshot snapshot = histogram.getSnapshot();
+
+      announce(prefix(name, "count"), group, histogram.getCount(), "");
+      announce(prefix(name, "max"), group, snapshot.getMax(), "");
+      announce(prefix(name, "mean"), group, snapshot.getMean(), "");
+      announce(prefix(name, "min"), group, snapshot.getMin(), "");
+      announce(prefix(name, "stddev"), group, snapshot.getStdDev(), "");
+      announce(prefix(name, "p50"), group, snapshot.getMedian(), "");
+      announce(prefix(name, "p75"), group, snapshot.get75thPercentile(), "");
+      announce(prefix(name, "p95"), group, snapshot.get95thPercentile(), "");
+      announce(prefix(name, "p98"), group, snapshot.get98thPercentile(), "");
+      announce(prefix(name, "p99"), group, snapshot.get99thPercentile(), "");
+      announce(prefix(name, "p999"), group, snapshot.get999thPercentile(), "");
+    } catch (GangliaException e) {
+      LOG.warn("Unable to report histogram {}", name, e);
+    }
+  }
+
+  private void reportCounter(String name, Counter counter) {
+    final String group = group(name);
+    try {
+      announce(prefix(name, "count"), group, counter.getCount(), "");
+    } catch (GangliaException e) {
+      LOG.warn("Unable to report counter {}", name, e);
+    }
+  }
+
+  private void reportGauge(String name, Gauge gauge) {
+    final String group = group(name);
+    final Object obj = gauge.getValue();
+
+    try {
+      ganglia.announce(name(prefix, name), String.valueOf(obj), detectType(obj), "",
+          GMetricSlope.BOTH, tMax, dMax, group);
+    } catch (GangliaException e) {
+      LOG.warn("Unable to report gauge {}", name, e);
+    }
+  }
+
+  private void announce(String name, String group, double value, String units) throws GangliaException {
+    ganglia.announce(name,
+        Double.toString(value),
+        GMetricType.DOUBLE,
+        units,
+        GMetricSlope.BOTH,
+        tMax,
+        dMax,
+        group);
+  }
+
+  private void announce(String name, String group, long value, String units) throws GangliaException {
+    final String v = Long.toString(value);
+    ganglia.announce(name,
+        v,
+        GMetricType.DOUBLE,
+        units,
+        GMetricSlope.BOTH,
+        tMax,
+        dMax,
+        group);
+  }
+
+  private GMetricType detectType(Object o) {
+    if (o instanceof Float) {
+      return GMetricType.FLOAT;
+    } else if (o instanceof Double) {
+      return GMetricType.DOUBLE;
+    } else if (o instanceof Byte) {
+      return GMetricType.INT8;
+    } else if (o instanceof Short) {
+      return GMetricType.INT16;
+    } else if (o instanceof Integer) {
+      return GMetricType.INT32;
+    } else if (o instanceof Long) {
+      return GMetricType.DOUBLE;
+    }
+    return GMetricType.STRING;
+  }
+
+  private String group(String name) {
+    String[] tokens = name.split("\\.");
+    if(tokens.length < 3) {
+      return "";
+    }
+    return tokens[0] + "." + tokens[1];
+  }
+
+  private String prefix(String name, String n) {
+    return name(prefix, name, n);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
new file mode 100644
index 0000000..80b77f1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+
+public class MetricsConsoleReporter extends TajoMetricsReporter {
+  @Override
+  public void report(SortedMap<String, Gauge> gauges,
+                     SortedMap<String, Counter> counters,
+                     SortedMap<String, Histogram> histograms,
+                     SortedMap<String, Meter> meters,
+                     SortedMap<String, Timer> timers) {
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    final String dateTime = dateFormat.format(new Date());
+    double rateFactor = TimeUnit.SECONDS.toSeconds(1);
+
+    if (!gauges.isEmpty()) {
+      Map<String, Map<String, Gauge>> gaugeGroups = findMetricsItemGroup(gauges);
+
+      for(Map.Entry<String, Map<String, Gauge>> eachGroup: gaugeGroups.entrySet()) {
+        System.out.println(gaugeGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+      }
+    }
+
+    if (!counters.isEmpty()) {
+      Map<String, Map<String, Counter>> counterGroups = findMetricsItemGroup(counters);
+
+      for(Map.Entry<String, Map<String, Counter>> eachGroup: counterGroups.entrySet()) {
+        System.out.println(counterGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+      }
+    }
+
+    if (!histograms.isEmpty()) {
+      Map<String, Map<String, Histogram>> histogramGroups = findMetricsItemGroup(histograms);
+
+      for(Map.Entry<String, Map<String, Histogram>> eachGroup: histogramGroups.entrySet()) {
+        System.out.println(histogramGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+      }
+    }
+
+    if (!meters.isEmpty()) {
+      Map<String, Map<String, Meter>> meterGroups = findMetricsItemGroup(meters);
+
+      for(Map.Entry<String, Map<String, Meter>> eachGroup: meterGroups.entrySet()) {
+        System.out.println(meterGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+      }
+    }
+
+    if (!timers.isEmpty()) {
+      Map<String, Map<String, Timer>> timerGroups = findMetricsItemGroup(timers);
+
+      for(Map.Entry<String, Map<String, Timer>> eachGroup: timerGroups.entrySet()) {
+        System.out.println(timerGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
new file mode 100644
index 0000000..286ef8d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+public class MetricsConsoleScheduledReporter extends MetricsStreamScheduledReporter {
+  public static final String REPORTER_NAME = "console";
+  @Override
+  protected String getReporterName() {
+    return REPORTER_NAME;
+  }
+
+  @Override
+  protected void afterInit() {
+    setOutput(System.out);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
new file mode 100644
index 0000000..35dd6f1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+
+public class MetricsFileScheduledReporter extends MetricsStreamScheduledReporter {
+  private static final Log LOG = LogFactory.getLog(MetricsFileScheduledReporter.class);
+  public static final String REPORTER_NAME = "file";
+
+  protected String getReporterName() {
+    return REPORTER_NAME;
+  }
+
+  @Override
+  protected void afterInit() {
+    String fileName = metricsProperties.get(metricsPropertyKey + "filename");
+    if(fileName == null) {
+      LOG.warn("No " + metricsPropertyKey + "filename property in tajo-metrics.properties");
+      return;
+    }
+    try {
+      File file = new File(fileName);
+      File parentFile = file.getParentFile();
+      if(parentFile != null && !parentFile.exists()) {
+        if(!parentFile.mkdirs()) {
+          LOG.warn("Can't create dir for tajo metrics:" + parentFile.getAbsolutePath());
+        }
+      }
+      this.setOutput(new FileOutputStream(fileName, true));
+      this.setDateFormat(null);
+    } catch (FileNotFoundException e) {
+      LOG.warn("Can't open metrics file:" + fileName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
new file mode 100644
index 0000000..4fbefd7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public abstract class MetricsStreamScheduledReporter extends TajoMetricsScheduledReporter {
+  private static final Log LOG = LogFactory.getLog(MetricsStreamScheduledReporter.class);
+
+  protected OutputStream output;
+  protected Locale locale;
+  protected Clock clock;
+  protected TimeZone timeZone;
+  protected MetricFilter filter;
+  protected DateFormat dateFormat;
+
+  private final byte[] NEW_LINE = "\n".getBytes();
+
+  public MetricsStreamScheduledReporter() {
+    dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    clock = Clock.defaultClock();
+  }
+
+  public void setOutput(OutputStream output) {
+    this.output = output;
+  }
+
+  public void setLocale(Locale locale) {
+    this.locale = locale;
+  }
+
+  public void setClock(Clock clock) {
+    this.clock = clock;
+  }
+
+  public void setTimeZone(TimeZone timeZone) {
+    this.dateFormat.setTimeZone(timeZone);
+    this.timeZone = timeZone;
+  }
+
+  public void setDateFormat(DateFormat dateFormat) {
+    this.dateFormat = dateFormat;
+  }
+
+  @Override
+  public void report(SortedMap<String, Gauge> gauges,
+                     SortedMap<String, Counter> counters,
+                     SortedMap<String, Histogram> histograms,
+                     SortedMap<String, Meter> meters,
+                     SortedMap<String, Timer> timers) {
+    final String dateTime = dateFormat == null ? "" + clock.getTime() : dateFormat.format(new Date(clock.getTime()));
+
+    if (!gauges.isEmpty()) {
+      Map<String, Map<String, Gauge>> gaugeGroups = findMetricsItemGroup(gauges);
+
+      for(Map.Entry<String, Map<String, Gauge>> eachGroup: gaugeGroups.entrySet()) {
+        printGaugeGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+      }
+    }
+
+    if (!counters.isEmpty()) {
+      Map<String, Map<String, Counter>> counterGroups = findMetricsItemGroup(counters);
+
+      for(Map.Entry<String, Map<String, Counter>> eachGroup: counterGroups.entrySet()) {
+        printCounterGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+      }
+    }
+
+    if (!histograms.isEmpty()) {
+      Map<String, Map<String, Histogram>> histogramGroups = findMetricsItemGroup(histograms);
+
+      for(Map.Entry<String, Map<String, Histogram>> eachGroup: histogramGroups.entrySet()) {
+        printHistogramGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+      }
+    }
+
+    if (!meters.isEmpty()) {
+      Map<String, Map<String, Meter>> meterGroups = findMetricsItemGroup(meters);
+
+      for(Map.Entry<String, Map<String, Meter>> eachGroup: meterGroups.entrySet()) {
+        printMeterGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+      }
+    }
+
+    if (!timers.isEmpty()) {
+      Map<String, Map<String, Timer>> timerGroups = findMetricsItemGroup(timers);
+
+      for(Map.Entry<String, Map<String, Timer>> eachGroup: timerGroups.entrySet()) {
+        printTimerGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+      }
+    }
+    try {
+      output.flush();
+    } catch (IOException e) {
+    }
+  }
+
+  private void printMeterGroup(String dateTime, String groupName, Map<String, Meter> meters) {
+    try {
+      output.write(meterGroupToString(dateTime, hostAndPort, rateFactor, groupName, meters).getBytes());
+      output.write(NEW_LINE);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  private void printCounterGroup(String dateTime, String groupName, Map<String, Counter> counters) {
+    try {
+      output.write(counterGroupToString(dateTime, hostAndPort, rateFactor, groupName, counters).getBytes());
+      output.write(NEW_LINE);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  private void printGaugeGroup(String dateTime, String groupName, Map<String, Gauge> gauges) {
+    try {
+      output.write(gaugeGroupToString(dateTime, hostAndPort, rateFactor, groupName, gauges).getBytes());
+      output.write(NEW_LINE);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  private void printHistogramGroup(String dateTime, String groupName, Map<String, Histogram> histograms) {
+    try {
+      output.write(histogramGroupToString(dateTime, hostAndPort, rateFactor, groupName, histograms).getBytes());
+      output.write(NEW_LINE);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  private void printTimerGroup(String dateTime, String groupName, Map<String, Timer> timers) {
+    try {
+      output.write(timerGroupToString(dateTime, hostAndPort, rateFactor, groupName, timers).getBytes());
+      output.write(NEW_LINE);
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void close() {
+    if(output != null) {
+      try {
+        output.close();
+      } catch (IOException e) {
+      }
+    }
+
+    super.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
new file mode 100644
index 0000000..9dc1755
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.util.SortedMap;
+
+public class NullReporter extends TajoMetricsReporter {
+  @Override
+  public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters,
+                     SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters,
+                     SortedMap<String, Timer> timers) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
new file mode 100644
index 0000000..a32a913
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+public abstract class TajoMetricsReporter {
+  public abstract void report(SortedMap<String, Gauge> gauges,
+                              SortedMap<String, Counter> counters,
+                              SortedMap<String, Histogram> histograms,
+                              SortedMap<String, Meter> meters,
+                              SortedMap<String, Timer> timers);
+
+  public <T> Map<String, Map<String, T>> findMetricsItemGroup(SortedMap<String, T> metricsMap) {
+    Map<String, Map<String, T>> metricsGroup = new HashMap<String, Map<String, T>>();
+
+    String previousGroup = null;
+    Map<String, T> groupItems = new HashMap<String, T>();
+
+    for (Map.Entry<String, T> entry : metricsMap.entrySet()) {
+      String key = entry.getKey();
+      String[] keyTokens = key.split("\\.");
+
+      String groupName = null;
+      String itemName = null;
+
+      if (keyTokens.length > 2) {
+        groupName = keyTokens[0] + "." + keyTokens[1];
+        itemName = "";
+        String prefix = "";
+        for (int i = 2; i < keyTokens.length; i++) {
+          itemName += prefix + keyTokens[i];
+          prefix = ".";
+        }
+      } else {
+        groupName = "";
+        itemName = key;
+        if(!metricsGroup.containsKey(groupName)) {
+          metricsGroup.put(groupName, new HashMap<String, T>());
+        }
+        metricsGroup.get(groupName).put(itemName, entry.getValue());
+        continue;
+      }
+
+      if (previousGroup != null && !previousGroup.equals(groupName)) {
+        metricsGroup.put(previousGroup, groupItems);
+        groupItems = new HashMap<String, T>();
+      }
+      groupItems.put(itemName, entry.getValue());
+      previousGroup = groupName;
+    }
+
+    if(groupItems != null && !groupItems.isEmpty()) {
+      metricsGroup.put(previousGroup, groupItems);
+    }
+
+    return metricsGroup;
+  }
+
+  protected String meterGroupToString(String dateTime, String hostAndPort, double rateFactor,
+                                      String groupName, Map<String, Meter> meters) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(dateTime).append(" ");
+    if(hostAndPort != null && !hostAndPort.isEmpty()) {
+      sb.append(hostAndPort).append(" ");
+    }
+    sb.append("meter").append(" ");
+
+    if(!groupName.isEmpty()) {
+      sb.append(groupName).append(" ");
+    }
+    String prefix = "";
+    for(Map.Entry<String, Meter> eachMeter: meters.entrySet()) {
+      String key = eachMeter.getKey();
+      Meter meter = eachMeter.getValue();
+      sb.append(prefix);
+      sb.append(key).append(".count=").append(meter.getCount()).append("|");
+      sb.append(key).append(".mean=").append(String.format("%2.2f",
+          convertRate(meter.getMeanRate(), rateFactor))).append("|");
+      sb.append(key).append(".1minute=").append(String.format("%2.2f",
+          convertRate(meter.getOneMinuteRate(), rateFactor))).append("|");
+      sb.append(key).append(".5minute=").append(String.format("%2.2f",
+          convertRate(meter.getFiveMinuteRate(), rateFactor))).append("|");
+      sb.append(key).append(".15minute=").append(String.format("%2.2f",
+          convertRate(meter.getFifteenMinuteRate(), rateFactor)));
+      prefix = ",";
+    }
+
+    return sb.toString();
+  }
+
+  protected String counterGroupToString(String dateTime, String hostAndPort, double rateFactor,
+                                        String groupName, Map<String, Counter> counters) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(dateTime).append(" ");
+    if(hostAndPort != null && !hostAndPort.isEmpty()) {
+      sb.append(hostAndPort).append(" ");
+    }
+    sb.append("counter").append(" ");
+
+    if(!groupName.isEmpty()) {
+      sb.append(groupName).append(" ");
+    }
+    String prefix = "";
+    for(Map.Entry<String, Counter> eachCounter: counters.entrySet()) {
+      sb.append(prefix);
+      sb.append(eachCounter.getKey()).append("=").append(eachCounter.getValue().getCount());
+      prefix = ",";
+
+    }
+    return sb.toString();
+  }
+
+  protected String gaugeGroupToString(String dateTime, String hostAndPort, double rateFactor,
+                                      String groupName, Map<String, Gauge> gauges) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(dateTime).append(" ");
+    if(hostAndPort != null && !hostAndPort.isEmpty()) {
+      sb.append(hostAndPort).append(" ");
+    }
+    sb.append("guage").append(" ");
+
+    if(!groupName.isEmpty()) {
+      sb.append(groupName).append(" ");
+    }
+    String prefix = "";
+    for(Map.Entry<String, Gauge> eachGauge: gauges.entrySet()) {
+      sb.append(prefix).append(eachGauge.getKey()).append("=").append(eachGauge.getValue().getValue());
+      prefix = ",";
+    }
+    return sb.toString();
+  }
+
+  protected String histogramGroupToString(String dateTime, String hostAndPort, double rateFactor,
+                                          String groupName, Map<String, Histogram> histograms) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(dateTime).append(" ");
+    if(hostAndPort != null && !hostAndPort.isEmpty()) {
+      sb.append(hostAndPort).append(" ");
+    }
+    sb.append("histo").append(" ");
+
+    String prefix = "";
+    for(Map.Entry<String, Histogram> eachHistogram: histograms.entrySet()) {
+      String key = eachHistogram.getKey();
+      Histogram histogram = eachHistogram.getValue();
+      sb.append(prefix);
+      sb.append(key).append(".count=").append(histogram.getCount()).append("|");
+
+      Snapshot snapshot = histogram.getSnapshot();
+
+      sb.append(key).append(".min=").append(snapshot.getMin()).append("|");
+      sb.append(key).append(".max=").append(snapshot.getMax()).append("|");
+      sb.append(key).append(".mean=").append(String.format("%2.2f", snapshot.getMean())).append("|");
+      sb.append(key).append(".stddev=").append(String.format("%2.2f", snapshot.getStdDev())).append("|");
+      sb.append(key).append(".median=").append(String.format("%2.2f", snapshot.getMedian())).append("|");
+      sb.append(key).append(".75%=").append(String.format("%2.2f", snapshot.get75thPercentile())).append("|");
+      sb.append(key).append(".95%=").append(String.format("%2.2f", snapshot.get95thPercentile())).append("|");
+      sb.append(key).append(".98%=").append(String.format("%2.2f", snapshot.get98thPercentile())).append("|");
+      sb.append(key).append(".99%=").append(String.format("%2.2f", snapshot.get99thPercentile())).append("|");
+      sb.append(key).append(".999%=").append(String.format("%2.2f", snapshot.get999thPercentile()));
+      prefix = ",";
+    }
+    return sb.toString();
+  }
+
+  protected String timerGroupToString(String dateTime, String hostAndPort, double rateFactor,
+                                 String groupName, Map<String, Timer> timers) {
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(dateTime).append(" ");
+    if(hostAndPort != null && !hostAndPort.isEmpty()) {
+      sb.append(hostAndPort).append(" ");
+    }
+    sb.append("timer").append(" ");
+
+    if(!groupName.isEmpty()) {
+      sb.append(groupName).append(" ");
+    }
+    String prefix = "";
+    for(Map.Entry<String, Timer> eachTimer: timers.entrySet()) {
+      String key = eachTimer.getKey();
+      Timer timer = eachTimer.getValue();
+      Snapshot snapshot = timer.getSnapshot();
+
+      sb.append(prefix);
+      sb.append(key).append(".count=").append(timer.getCount()).append("|");
+      sb.append(key).append(".meanrate=").append(String.format("%2.2f", convertRate(timer.getMeanRate(), rateFactor))).append("|");
+      sb.append(key).append(".1minuterate=").append(String.format("%2.2f", convertRate(timer.getOneMinuteRate(), rateFactor))).append("|");
+      sb.append(key).append(".5minuterate=").append(String.format("%2.2f", convertRate(timer.getFiveMinuteRate(), rateFactor))).append("|");
+      sb.append(key).append(".15minuterate=").append(String.format("%2.2f", convertRate(timer.getFifteenMinuteRate(),rateFactor))).append("|");
+      sb.append(key).append(".min=").append(String.format("%2.2f", convertRate(snapshot.getMin(), rateFactor))).append("|");
+      sb.append(key).append(".max=").append(String.format("%2.2f", convertRate(snapshot.getMax(),rateFactor))).append("|");
+      sb.append(key).append(".mean=").append(String.format("%2.2f", convertRate(snapshot.getMean(), rateFactor))).append("|");
+      sb.append(key).append(".stddev=").append(String.format("%2.2f", convertRate(snapshot.getStdDev(),rateFactor))).append("|");
+      sb.append(key).append(".median=").append(String.format("%2.2f", convertRate(snapshot.getMedian(), rateFactor))).append("|");
+      sb.append(key).append(".75%=").append(String.format("%2.2f", convertRate(snapshot.get75thPercentile(), rateFactor))).append("|");
+      sb.append(key).append(".95%=").append(String.format("%2.2f", convertRate(snapshot.get95thPercentile(), rateFactor))).append("|");
+      sb.append(key).append(".98%=").append(String.format("%2.2f", convertRate(snapshot.get98thPercentile(), rateFactor))).append("|");
+      sb.append(key).append(".99%=").append(String.format("%2.2f", convertRate(snapshot.get99thPercentile(), rateFactor))).append("|");
+      sb.append(key).append(".999%=").append(String.format("%2.2f", convertRate(snapshot.get999thPercentile(),rateFactor)));
+      prefix = ",";
+    }
+
+    return sb.toString();
+  }
+
+  protected double convertRate(double rate, double rateFactor) {
+    return rate * rateFactor;
+  }
+
+}